diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 35ebd5b0cdb..29ea3767809 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -38,8 +38,7 @@ DBGInvoker::DBGInvoker() regFunc("mock_tidb_table", dbgFuncMockTiDBTable); regFunc("drop_tidb_table", dbgFuncDropTiDBTable); - regFunc("set_flush_rows", dbgFuncSetFlushRows); - regFunc("set_deadline_seconds", dbgFuncSetDeadlineSeconds); + regFunc("set_flush_threshold", dbgFuncSetFlushThreshold); regFunc("raft_insert_row", dbgFuncRaftInsertRow); regFunc("raft_insert_rows", dbgFuncRaftInsertRows); diff --git a/dbms/src/Debug/dbgFuncMockTiDBData.cpp b/dbms/src/Debug/dbgFuncMockTiDBData.cpp index 9d5bec53447..73119185d84 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBData.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBData.cpp @@ -17,32 +17,19 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output) +void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output) { - if (args.size() != 1) - throw Exception("Args not matched, should be: threshold-rows", ErrorCodes::BAD_ARGUMENTS); + if (args.size() != 2) + throw Exception("Args not matched, should be: bytes, seconds", ErrorCodes::BAD_ARGUMENTS); - auto rows = safeGet(typeid_cast(*args[0]).value); - TMTContext & tmt = context.getTMTContext(); - tmt.table_flushers.setFlushThresholdRows(rows); - - std::stringstream ss; - ss << "set flush threshold to " << rows << " rows"; - output(ss.str()); -} - -void dbgFuncSetDeadlineSeconds(Context & context, const ASTs & args, DBGInvoker::Printer output) -{ - if (args.size() != 1) - throw Exception("Args not matched, should be: second-uint", ErrorCodes::BAD_ARGUMENTS); - - const UInt64 second = safeGet(typeid_cast(*args[0]).value); + auto bytes = safeGet(typeid_cast(*args[0]).value); + auto seconds = safeGet(typeid_cast(*args[1]).value); TMTContext & tmt = context.getTMTContext(); - tmt.table_flushers.setDeadlineSeconds(second); + tmt.region_partition.setFlushThresholds({{bytes, Seconds(seconds)}}); std::stringstream ss; - ss << "set deadline seconds to " << second << "s"; + ss << "set flush threshold to (" << bytes << " bytes, " << seconds << " seconds)"; output(ss.str()); } diff --git a/dbms/src/Debug/dbgFuncMockTiDBData.h b/dbms/src/Debug/dbgFuncMockTiDBData.h index d30b0d2ab70..6c3b37cac18 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBData.h +++ b/dbms/src/Debug/dbgFuncMockTiDBData.h @@ -12,7 +12,7 @@ namespace DB // Change flush threshold rows // Usage: // ./storages-client.sh "DBGInvoke set_flush_rows(threshold_rows)" -void dbgFuncSetFlushRows(Context & context, const ASTs & args, DBGInvoker::Printer output); +void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::Printer output); // Change flush deadline seconds // Usage: diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index 635a5409e0a..a63cf13eca1 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -94,7 +94,6 @@ void dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Prin TMTContext & tmt = context.getTMTContext(); tmt.region_partition.dropRegionsInTable(table_id); - tmt.table_flushers.dropRegionsInTable(table_id); MockTiDB::instance().dropTable(database_name, table_name); diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index a00318788d0..425d3447cf6 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -295,7 +295,7 @@ std::vector> getPartitionRegionRanges( }; TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, callback); + tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, callback); return handle_ranges; } @@ -494,7 +494,7 @@ void dbgFuncCheckRegionCorrect(Context & context, const ASTs & args, DBGInvoker: for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) { std::unordered_map partition_regions; - tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, context, [&](Regions regions) + tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, [&](Regions regions) { for (auto region : regions) partition_regions[region->id()] = region; diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index 6cbb92130b4..d86b21b951c 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -144,8 +144,6 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, false); tmt.kvstore->onServiceCommand(cmds, raft_ctx); } - - tmt.table_flushers.onPutTryFlush(region); } void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID handle_id, Context & context) @@ -167,7 +165,6 @@ void remove(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han addRequestsToRaftCmd(cmds.add_requests(), region_id, key, value, prewrite_ts, commit_ts, true); tmt.kvstore->onServiceCommand(cmds, raft_ctx); - tmt.table_flushers.onPutTryFlush(region); } struct BatchCtrl @@ -271,7 +268,6 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr } tmt.kvstore->onServiceCommand(cmds, raft_ctx); - tmt.table_flushers.onPutTryFlush(region); } } @@ -322,7 +318,7 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_ for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) { TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, context, [&](Regions d){ + tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, [&](Regions d){ regions.insert(regions.end(), d.begin(), d.end()); }); } diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 5d43621e4ab..4810ea34c92 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -34,11 +34,16 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, { RaftContext rctx(&db_context, grpc_context, stream); BackgroundProcessingPool::TaskHandle persist_handle; + BackgroundProcessingPool::TaskHandle flush_handle; + + RegionPartition & region_partition = db_context.getTMTContext().region_partition; + try { kvstore->report(rctx); persist_handle = background_pool.addTask([&, this] { return kvstore->tryPersistAndReport(rctx); }); + flush_handle = background_pool.addTask([&] { return region_partition.tryFlushRegions(); }); enginepb::CommandRequestBatch request; while (stream->Read(&request)) @@ -53,6 +58,8 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, if (persist_handle) background_pool.removeTask(persist_handle); + if (flush_handle) + background_pool.removeTask(flush_handle); return grpc::Status::CANCELLED; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 1fe996ecf7a..68b8118d0c8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -622,7 +622,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( continue; auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition( data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read, - const_cast(context), false, true, query_info.resolve_locks, query_info.read_tso); + false, true, query_info.resolve_locks, query_info.read_tso); if (region_input_stream) res.emplace_back(region_input_stream); } @@ -671,7 +671,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( { auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition( data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read, - const_cast(context), false, true, query_info.resolve_locks, query_info.read_tso); + false, true, query_info.resolve_locks, query_info.read_tso); if (region_input_stream) { BlockInputStreamPtr version_filtered_stream = std::make_shared( diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 69f29f0c1cc..bd4bc5f900c 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -11,8 +11,8 @@ extern const int LOGICAL_ERROR; } // TODO move to Settings.h -//static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes -//static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds +static Seconds REGION_PERSIST_PERIOD(60); // 1 minutes +static Seconds KVSTORE_TRY_PERSIST_PERIOD(10); // 10 seconds KVStore::KVStore(const std::string & data_dir, Context * context) : region_persister(data_dir), log(&Logger::get("KVStore")) { @@ -65,7 +65,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context) } if (tmt_ctx && old_region) - tmt_ctx->region_partition.removeRegion(old_region, *context); + tmt_ctx->region_partition.removeRegion(old_region); region_persister.persist(region); @@ -75,7 +75,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context) } if (tmt_ctx) - tmt_ctx->table_flushers.onPutTryFlush(region); + tmt_ctx->region_partition.applySnapshotRegion(region); } void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx) @@ -134,7 +134,9 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC continue; } - auto [new_region, split_regions, sync] = curr_region->onCommand(cmd, callback); + auto before_cache_bytes = curr_region->dataSize(); + + auto [new_region, split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback); if (curr_region->isPendingRemove()) { @@ -147,10 +149,11 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC continue; } - // Persist current region and split regions, and mange data in partition - // Add to regions map so that queries can see them. + if (!split_regions.empty()) { + // Persist current region and split regions, and mange data in partition + // Add to regions map so that queries can see them. // TODO: support atomic or idempotent operation. { std::lock_guard lock(mutex); @@ -168,23 +171,19 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC } if (tmt_ctx) - { - tmt_ctx->region_partition.splitRegion(curr_region, split_regions, *context); - - tmt_ctx->table_flushers.onPutTryFlush(curr_region); - for (const auto & region : split_regions) - tmt_ctx->table_flushers.onPutTryFlush(region); - } + tmt_ctx->region_partition.splitRegion(curr_region, split_regions); region_persister.persist(curr_region); for (const auto & region : split_regions) region_persister.persist(region); } - else if (sync) + else { if (tmt_ctx) - tmt_ctx->table_flushers.onPutTryFlush(curr_region); - region_persister.persist(curr_region); + tmt_ctx->region_partition.updateRegion(curr_region, before_cache_bytes, table_ids); + + if (sync) + region_persister.persist(curr_region); } if (sync) @@ -218,7 +217,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context) { std::lock_guard lock(mutex); - Poco::Timestamp now; + Timepoint now = Clock::now(); if (now < (last_try_persist_time + KVSTORE_TRY_PERSIST_PERIOD)) return false; last_try_persist_time = now; @@ -229,7 +228,7 @@ bool KVStore::tryPersistAndReport(RaftContext & context) for (const auto & p : regions) { const auto region = p.second; - if (Poco::Timestamp() < (region->lastPersistTime() + REGION_PERSIST_PERIOD)) + if (now < (region->lastPersistTime() + REGION_PERSIST_PERIOD)) continue; persist_job = true; @@ -263,7 +262,7 @@ void KVStore::removeRegion(RegionID region_id, Context * context) region_persister.drop(region_id); if (context) - context->getTMTContext().region_partition.removeRegion(region, *context); + context->getTMTContext().region_partition.removeRegion(region); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 6f781145692..553dfa056ed 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -11,15 +11,11 @@ #include #include #include -#include #include namespace DB { -// TODO move to Settings.h -static constexpr Int64 REGION_PERSIST_PERIOD = 60 * 1000 * 1000; // 1 minutes -static constexpr Int64 KVSTORE_TRY_PERSIST_PERIOD = 10 * 1000 * 1000; // 10 seconds /// TODO: brief design document. class KVStore final : private boost::noncopyable @@ -40,7 +36,6 @@ class KVStore final : private boost::noncopyable // Currently we also trigger region files GC in it. bool tryPersistAndReport(RaftContext & context); - // TODO: Value copy instead of value ref // For test, please do NOT remove. RegionMap & _regions() { return regions; } @@ -54,7 +49,7 @@ class KVStore final : private boost::noncopyable std::mutex mutex; Consistency consistency; - Poco::Timestamp last_try_persist_time{}; + Timepoint last_try_persist_time = Clock::now(); Logger * log; }; diff --git a/dbms/src/Storages/Transaction/LockException.h b/dbms/src/Storages/Transaction/LockException.h index a62ad32f6eb..0fce4f61ec9 100644 --- a/dbms/src/Storages/Transaction/LockException.h +++ b/dbms/src/Storages/Transaction/LockException.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { @@ -8,10 +9,9 @@ namespace DB class LockException : public Exception { public: - explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_)) - {} + explicit LockException(Region::LockInfos && lock_infos_) : lock_infos(std::move(lock_infos_)) {} Region::LockInfos lock_infos; }; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/PartitionDataMover.h b/dbms/src/Storages/Transaction/PartitionDataMover.h index 37e8b836c25..b6f4d8c00ab 100644 --- a/dbms/src/Storages/Transaction/PartitionDataMover.h +++ b/dbms/src/Storages/Transaction/PartitionDataMover.h @@ -1,7 +1,10 @@ #pragma once -#include "Storages/Transaction/Region.h" -#include "Storages/Transaction/TiKVKeyValue.h" +#include + +#include +#include +#include namespace DB { diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp new file mode 100644 index 00000000000..0edd13f4514 --- /dev/null +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -0,0 +1,84 @@ +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +BlockInputStreamPtr RegionPartition::getBlockInputStreamByPartition(TableID table_id, UInt64 partition_id, + const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, + bool remove_on_read, bool learner_read, bool resolve_locks, UInt64 start_ts) +{ + auto & kvstore = context.getTMTContext().kvstore; + Regions partition_regions; + { + std::lock_guard lock(mutex); + + auto & table = getOrCreateTable(table_id); + auto & partition = table.partitions.get()[partition_id]; + + for (auto & region_id : partition.region_ids) + { + auto region = kvstore->getRegion(region_id); + if (!region) + throw Exception("Region " + DB::toString(region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); + partition_regions.push_back(region); + } + } + + if (partition_regions.empty()) + return {}; + + if (learner_read) + { + for (const auto & region : partition_regions) + { + region->wait_index(region->learner_read()); + } + } + + auto schema_fetcher = [&](TableID) { + return std::make_tuple(&table_info, &columns, &ordered_columns); + }; + + RegionBlockReader reader(schema_fetcher, std::move(partition_regions), table_id, remove_on_read, resolve_locks, start_ts); + + BlocksList blocks; + Block block; + TableID current_table_id; + RegionID current_region_id; + Region::LockInfoPtr lock_info; + Region::LockInfos lock_infos; + while (true) + { + std::tie(block, current_table_id, current_region_id, lock_info) = reader.next(); + if (lock_info) + { + lock_infos.emplace_back(std::move(lock_info)); + continue; + } + if (!block || block.rows() == 0) + break; + if (table_id != current_table_id) + throw Exception("RegionPartition::getBlockInputStreamByPartition", ErrorCodes::LOGICAL_ERROR); + blocks.push_back(block); + } + + if (!lock_infos.empty()) + throw LockException(std::move(lock_infos)); + + if (blocks.empty()) + return {}; + + return std::make_shared(std::move(blocks)); +} + + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index e283582f49f..87b90389b7d 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -35,12 +35,10 @@ Region::KVMap::iterator Region::removeDataByWriteIt(const KVMap::iterator & writ } cf_data_size -= data_it->first.dataSize() + data_it->second.dataSize(); - newly_added_rows--; data_cf.erase(data_it); } cf_data_size -= write_it->first.dataSize() + write_it->second.dataSize(); - newly_added_rows--; return write_cf.erase(write_it); } @@ -76,7 +74,6 @@ Region::ReadInfo Region::readDataByWriteIt(const KVMap::iterator & write_it) Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ts) { - // TODO: Should we check data cf as well? for (auto && [key, value] : lock_cf) { auto decode_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); @@ -97,48 +94,50 @@ Region::LockInfoPtr Region::getLockInfo(TableID expected_table_id, UInt64 start_ return nullptr; } -void Region::insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) +TableID Region::insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) { - // TODO: this will be slow, use batch to speed up std::lock_guard lock(mutex); - doInsert(cf, key, value); + return doInsert(cf, key, value); } -void Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) +TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) { // Ignoring all keys other than records. String raw_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); if (!RecordKVFormat::isRecord(raw_key)) - return; + return InvalidTableID; - if (isTiDBSystemTable(RecordKVFormat::getTableId(raw_key))) - return; + auto table_id = RecordKVFormat::getTableId(raw_key); + if (isTiDBSystemTable(table_id)) + return InvalidTableID; auto & map = getCf(cf); auto p = map.try_emplace(key, value); if (!p.second) throw Exception(toString() + " found existing key [" + key.toString() + "]", ErrorCodes::LOGICAL_ERROR); - cf_data_size += key.dataSize() + value.dataSize(); - newly_added_rows += 1; + if (cf != lock_cf_name) + cf_data_size += key.dataSize() + value.dataSize(); + + return table_id; } -void Region::remove(const std::string & cf, const TiKVKey & key) +TableID Region::remove(const std::string & cf, const TiKVKey & key) { - // TODO: this will be slow, use batch insert to speed up std::lock_guard lock(mutex); - doRemove(cf, key); + return doRemove(cf, key); } -void Region::doRemove(const std::string & cf, const TiKVKey & key) +TableID Region::doRemove(const std::string & cf, const TiKVKey & key) { // Ignoring all keys other than records. String raw_key = std::get<0>(RecordKVFormat::decodeTiKVKey(key)); if (!RecordKVFormat::isRecord(raw_key)) - return; + return InvalidTableID; - if (isTiDBSystemTable(RecordKVFormat::getTableId(raw_key))) - return; + auto table_id = RecordKVFormat::getTableId(raw_key); + if (isTiDBSystemTable(table_id)) + return InvalidTableID; auto & map = getCf(cf); auto it = map.find(key); @@ -148,28 +147,29 @@ void Region::doRemove(const std::string & cf, const TiKVKey & key) // tikv gc will delete useless data in write & default cf. if (unlikely(&map == &lock_cf)) LOG_WARNING(log, toString() << " key not found [" << key.toString() << "] in cf " << cf); - return; + return table_id; } map.erase(it); - cf_data_size -= key.dataSize() + it->second.dataSize(); - newly_added_rows--; + if (cf != lock_cf_name) + cf_data_size -= key.dataSize() + it->second.dataSize(); + return table_id; } -UInt64 Region::getIndex() { - return meta.appliedIndex(); -} +UInt64 Region::getIndex() { return meta.appliedIndex(); } RegionPtr Region::splitInto(const RegionMeta & meta) const { - // TODO: remove data in corresponding partition - auto [start_key, end_key] = meta.getRange(); RegionPtr new_region; - if (client != nullptr) { - new_region = std::make_shared(meta, std::make_shared(client->cache, client->client, meta.getRegionVerID())); - } else { + if (client != nullptr) + { + new_region = std::make_shared( + meta, std::make_shared(client->cache, client->client, meta.getRegionVerID())); + } + else + { new_region = std::make_shared(meta); } @@ -181,7 +181,6 @@ RegionPtr Region::splitInto(const RegionMeta & meta) const { new_region->data_cf.emplace(key, value); new_region->cf_data_size += key.dataSize() + value.dataSize(); - new_region->newly_added_rows += 1; } } @@ -193,7 +192,6 @@ RegionPtr Region::splitInto(const RegionMeta & meta) const { new_region->write_cf.emplace(key, value); new_region->cf_data_size += key.dataSize() + value.dataSize(); - new_region->newly_added_rows += 1; } } @@ -204,8 +202,6 @@ RegionPtr Region::splitInto(const RegionMeta & meta) const if (ok) { new_region->lock_cf.emplace(key, value); - new_region->cf_data_size += key.dataSize() + value.dataSize(); - new_region->newly_added_rows += 1; } } @@ -277,8 +273,8 @@ std::pair Region::execBatchSplit(const raft_cmdpb::AdminRequ { const auto & peer = findPeer(region_info, store_id); RegionMeta new_meta(peer, region_info, initialApplyState()); - auto new_region = splitInto(new_meta); - split_regions.emplace_back(new_region); + auto split_region = splitInto(new_meta); + split_regions.emplace_back(split_region); } } @@ -291,7 +287,8 @@ std::pair Region::execBatchSplit(const raft_cmdpb::AdminRequ return {new_region, split_regions}; } -std::tuple, bool> Region::onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) +std::tuple, TableIDSet, bool> Region::onCommand( + const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) { auto & header = cmd.header(); RegionID region_id = header.region_id(); @@ -309,11 +306,13 @@ std::tuple, bool> Region::onCommand(const engi { if (!sync_log) throw Exception("sync_log should be true", ErrorCodes::LOGICAL_ERROR); - return {{}, {}, true}; + return {{}, {}, {}, true}; } if (!checkIndex(index)) - return {{}, {}, false}; + return {{}, {}, {}, false}; + + TableIDSet table_ids; if (cmd.has_admin_request()) { @@ -334,23 +333,10 @@ std::tuple, bool> Region::onCommand(const engi std::tie(new_region, split_regions) = execBatchSplit(request, response); break; case raft_cmdpb::AdminCmdType::CompactLog: - // Ignore - break; case raft_cmdpb::AdminCmdType::ComputeHash: - { - // Currently hash is borken, because data in region will be flush and remove - // auto & raft_local_state = header.context(); - // callback.compute_hash(this->shared_from_this(), index, raft_local_state); - break; - } case raft_cmdpb::AdminCmdType::VerifyHash: - { - // const auto & verify_req = request.verify_hash(); - // auto expected_index = verify_req.index(); - // const auto & expected_hash = verify_req.hash(); - // callback.verify_hash(this->shared_from_this(), expected_index, expected_hash); + // Ignore break; - } default: LOG_ERROR(log, "Unsupported admin command type " << raft_cmdpb::AdminCmdType_Name(type)); break; @@ -372,13 +358,17 @@ std::tuple, bool> Region::onCommand(const engi { const auto & put = req.put(); auto [key, value] = RecordKVFormat::genKV(put); - insert(put.cf(), key, value); + auto table_id = insert(put.cf(), key, value); + if (table_id != InvalidTableID) + table_ids.emplace(table_id); break; } case raft_cmdpb::CmdType::Delete: { const auto & del = req.delete_(); - remove(del.cf(), RecordKVFormat::genKey(del)); + auto table_id = remove(del.cf(), RecordKVFormat::genKey(del)); + if (table_id != InvalidTableID) + table_ids.emplace(table_id); break; } case raft_cmdpb::CmdType::DeleteRange: @@ -390,7 +380,8 @@ std::tuple, bool> Region::onCommand(const engi case raft_cmdpb::CmdType::Prewrite: case raft_cmdpb::CmdType::Invalid: default: - throw Exception("Illegal cmd type: " + raft_cmdpb::CmdType_Name(type), ErrorCodes::LOGICAL_ERROR); + LOG_ERROR(log, "Unsupported command type " << raft_cmdpb::CmdType_Name(type)); + break; } } } @@ -401,7 +392,7 @@ std::tuple, bool> Region::onCommand(const engi if (new_region) (*new_region).meta.setApplied(index, term); - return {new_region, split_regions, sync_log}; + return {new_region, split_regions, table_ids, sync_log}; } size_t Region::serialize(WriteBuffer & buf) @@ -444,7 +435,6 @@ RegionPtr Region::deserialize(ReadBuffer & buf) auto value = TiKVValue::deserialize(buf); region->data_cf.emplace(key, value); region->cf_data_size += key.dataSize() + value.dataSize(); - region->newly_added_rows += 1; } size = readBinary2(buf); @@ -454,7 +444,6 @@ RegionPtr Region::deserialize(ReadBuffer & buf) auto value = TiKVValue::deserialize(buf); region->write_cf.emplace(key, value); region->cf_data_size += key.dataSize() + value.dataSize(); - region->newly_added_rows += 1; } size = readBinary2(buf); @@ -463,8 +452,6 @@ RegionPtr Region::deserialize(ReadBuffer & buf) auto key = TiKVKey::deserialize(buf); auto value = TiKVValue::deserialize(buf); region->lock_cf.emplace(key, value); - region->cf_data_size += key.dataSize() + value.dataSize(); - region->newly_added_rows += 1; } return region; @@ -502,7 +489,6 @@ void Region::calculateCfCrc32(Crc32 & crc32) const { std::lock_guard lock1(mutex); - // TODO: is it calculating key order need? auto crc_cal = [&](const Region::KVMap & map) { for (auto && [key, value] : map) { @@ -522,15 +508,17 @@ bool Region::isPendingRemove() const { return meta.isPendingRemove(); } void Region::setPendingRemove() { meta.setPendingRemove(); } +size_t Region::dataSize() const { return cf_data_size; } + void Region::markPersisted() { - std::lock_guard lock(persist_time_mutex); - last_persist_time = Poco::Timestamp(); + std::lock_guard lock(mutex); + last_persist_time = Clock::now(); } -const Poco::Timestamp & Region::lastPersistTime() const +Timepoint Region::lastPersistTime() const { - std::lock_guard lock(persist_time_mutex); + std::lock_guard lock(mutex); return last_persist_time; } @@ -539,24 +527,23 @@ std::unique_ptr Region::createCommittedScanRemover return std::make_unique(this->shared_from_this(), expected_table_id); } -size_t Region::getNewlyAddedRows() const { return newly_added_rows; } - -void Region::resetNewlyAddedRows() { newly_added_rows = 0; } - std::string Region::toString(bool dump_status) const { return meta.toString(dump_status); } enginepb::CommandResponse Region::toCommandResponse() const { return meta.toCommandResponse(); } RegionRange Region::getRange() const { return meta.getRange(); } -UInt64 Region::learner_read() { +UInt64 Region::learner_read() +{ if (client != nullptr) return client->getReadIndex(); return 0; } -void Region::wait_index(UInt64 index) { - if (client != nullptr) { +void Region::wait_index(UInt64 index) +{ + if (client != nullptr) + { LOG_TRACE(log, "begin to wait learner index : " + std::to_string(index)); meta.wait_index(index); } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 68ec5772489..4e2d6138b0c 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -32,7 +32,7 @@ class Region : public std::enable_shared_from_this const static String write_cf_name; // In both lock_cf and write_cf. - enum CFModifyFlag: UInt8 + enum CFModifyFlag : UInt8 { PutFlag = 'P', DelFlag = 'D', @@ -75,7 +75,8 @@ class Region : public std::enable_shared_from_this : lock(store_->mutex), store(store_), expected_table_id(expected_table_id_), write_map_it(store->write_cf.begin()) {} - // TODO: 3 times finding is slow: hasNext/next/remove + /// Check if next kv exists. + /// Return InvalidTableID if not. TableID hasNext() { if (expected_table_id != InvalidTableID) @@ -107,10 +108,7 @@ class Region : public std::enable_shared_from_this } } - LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) - { - return store->getLockInfo(expected_table_id, start_ts); - } + LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } private: std::lock_guard lock; @@ -125,14 +123,18 @@ class Region : public std::enable_shared_from_this explicit Region(const RegionMeta & meta_) : meta(meta_), client(nullptr), log(&Logger::get("Region")) {} - explicit Region(RegionMeta && meta_, pingcap::kv::RegionClientPtr client_) : meta(std::move(meta_)), client(client_), log(&Logger::get("Region")) {} + explicit Region(RegionMeta && meta_, pingcap::kv::RegionClientPtr client_) + : meta(std::move(meta_)), client(client_), log(&Logger::get("Region")) + {} - explicit Region(const RegionMeta & meta_, const pingcap::kv::RegionClientPtr & client_) : meta(meta_), client(client_), log(&Logger::get("Region")) {} + explicit Region(const RegionMeta & meta_, const pingcap::kv::RegionClientPtr & client_) + : meta(meta_), client(client_), log(&Logger::get("Region")) + {} - void insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); - void remove(const std::string & cf, const TiKVKey & key); + TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); + TableID remove(const std::string & cf, const TiKVKey & key); - std::tuple, bool> onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & persis); + std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & persis); std::unique_ptr createCommittedScanRemover(TableID expected_table_id); @@ -141,8 +143,6 @@ class Region : public std::enable_shared_from_this void calculateCfCrc32(Crc32 & crc32) const; - void markPersisted(); - RegionID id() const; RegionRange getRange() const; @@ -152,10 +152,10 @@ class Region : public std::enable_shared_from_this bool isPendingRemove() const; void setPendingRemove(); - const Poco::Timestamp & lastPersistTime() const; + size_t dataSize() const; - size_t getNewlyAddedRows() const; - void resetNewlyAddedRows(); + void markPersisted(); + Timepoint lastPersistTime() const; void swap(Region & other) { @@ -170,9 +170,6 @@ class Region : public std::enable_shared_from_this cf_data_size = size_t(other.cf_data_size); other.cf_data_size = size_t(cf_data_size); - - newly_added_rows = size_t (other.newly_added_rows); - other.newly_added_rows = size_t (newly_added_rows); } friend bool operator==(const Region & region1, const Region & region2) @@ -193,8 +190,8 @@ class Region : public std::enable_shared_from_this private: // Private methods no need to lock mutex, normally - void doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); - void doRemove(const std::string & cf, const TiKVKey & key); + TableID doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); + TableID doRemove(const std::string & cf, const TiKVKey & key); bool checkIndex(UInt64 index); KVMap & getCf(const std::string & cf); @@ -215,27 +212,16 @@ class Region : public std::enable_shared_from_this KVMap write_cf; KVMap lock_cf; - // Protect CFs only mutable std::mutex mutex; - std::condition_variable cv; - - // Vars below are thread safe - - // The meta_mutex can protect a group operating on meta, during the operating time all reading to meta can be blocked. - // Since only one thread (onCommand) would modify meta for now, we remove meta_mutex. - // mutable std::mutex meta_mutex; RegionMeta meta; pingcap::kv::RegionClientPtr client; - // These two vars are not exactly correct, because they are not protected by mutex when region splitted + // Size of data cf & write cf, without lock cf. std::atomic cf_data_size = 0; - std::atomic newly_added_rows = 0; - // Poco::timestamp is not trivially copyable, use a extra mutex for this member - Poco::Timestamp last_persist_time{0}; - mutable std::mutex persist_time_mutex; + Timepoint last_persist_time = Clock::now(); Logger * log; }; diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 602e068c781..b07ec5ed4c9 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -122,7 +122,8 @@ class RegionConcatedScanRemover bool ever_seen_lock = false; }; -// FIXME: remove_on_read is not safe because cache will be remove before data flush into storage. +// FIXME: remove_on_read is not safe because cache will be removed before data flush into storage. +// If the process reboot during flush, it could lead to data loss. // Please design another mechanism to gracefully remove cache after flush. class RegionBlockReader { diff --git a/dbms/src/Storages/Transaction/RegionPartition.cpp b/dbms/src/Storages/Transaction/RegionPartition.cpp index 4a7828cd0e7..ba269567e52 100644 --- a/dbms/src/Storages/Transaction/RegionPartition.cpp +++ b/dbms/src/Storages/Transaction/RegionPartition.cpp @@ -1,50 +1,121 @@ +#include + +#include +#include #include #include #include #include -#include - -#include -#include -#include - namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; +extern const int UNKNOWN_TABLE; +} // namespace ErrorCodes + +// ============================================================= +// Static methods. +// ============================================================= + +auto getRegionTableIds(const RegionPtr & region) +{ + std::unordered_set table_ids; + { + auto scanner = region->createCommittedScanRemover(InvalidTableID); + while (true) + { + TableID table_id = scanner->hasNext(); + if (table_id == InvalidTableID) + break; + table_ids.emplace(table_id); + scanner->next(); + } + } + return table_ids; } -RegionPartition::RegionPartition(const std::string & parent_path_) : parent_path(parent_path_), log(&Logger::get("RegionPartition")) +Int64 calculatePartitionCacheBytes(KVStore & kvstore, const std::set & region_ids) { - Poco::File dir(parent_path + "tables/"); - if (!dir.exists()) - dir.createDirectories(); + Int64 bytes = 0; + for (auto region_id : region_ids) + { + bytes += kvstore.getRegion(region_id)->dataSize(); + } + return bytes; +} - std::vector file_names; - dir.list(file_names); +// ============================================================= +// Private member functions. +// ============================================================= - // Restore all table mappings and region mappings. - for (auto & name : file_names) +RegionPartition::Table & RegionPartition::getOrCreateTable(TableID table_id) +{ + auto it = tables.find(table_id); + if (it == tables.end()) { - TableID table_id = std::stoull(name); - auto p = tables.try_emplace(table_id, parent_path + "tables/", table_id); - Table & table = p.first->second; - for (PartitionID partition_id = 0; partition_id < table.partitions.get().size(); ++partition_id) + // Load persisted info. + auto & tmt_ctx = context.getTMTContext(); + auto storage = tmt_ctx.storages.get(table_id); + if (!storage) { - auto & partition = table.partitions.get()[partition_id]; - for (auto region_id : partition.region_ids) - { - auto it = regions.find(region_id); - if (it == regions.end()) - std::tie(it, std::ignore) = regions.try_emplace(region_id); - RegionInfo & region_info = it->second; - region_info.table_to_partition.emplace(table_id, partition_id); - } + tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); + storage = tmt_ctx.storages.get(table_id); } + + auto * merge_tree = dynamic_cast(storage.get()); + auto partition_number = merge_tree->getData().settings.mutable_mergetree_partition_number; + + std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id, partition_number); + + auto & table = it->second; + table.partitions.persist(); } + return it->second; +} + +size_t RegionPartition::selectPartitionId(Table & table, RegionID region_id) +{ + // size_t partition_id = rng() % table.partitions.get().size(); + size_t partition_id = (next_partition_id++) % table.partitions.get().size(); + LOG_DEBUG(log, "Table " << table.table_id << " assign region " << region_id << " to partition " << partition_id); + return partition_id; +} + +std::pair RegionPartition::insertRegion(Table & table, size_t partition_id, RegionID region_id) +{ + auto & table_partitions = table.partitions.get(); + // Insert table mapping. + table_partitions[partition_id].region_ids.emplace(region_id); + table.partitions.persist(); + + // Insert region mapping. + auto r_it = regions.find(region_id); + if (r_it == regions.end()) + std::tie(r_it, std::ignore) = regions.try_emplace(region_id); + RegionInfo & region_info = r_it->second; + region_info.table_to_partition.emplace(table.table_id, partition_id); + + return {partition_id, table_partitions[partition_id]}; +} + +std::pair RegionPartition::getOrInsertRegion(TableID table_id, RegionID region_id) +{ + auto & table = getOrCreateTable(table_id); + auto & table_partitions = table.partitions.get(); + + for (size_t partition_index = 0; partition_index < table_partitions.size(); ++partition_index) + { + const auto & partition = table_partitions[partition_index]; + if (partition.region_ids.find(region_id) != partition.region_ids.end()) + return {partition_index, table_partitions[partition_index]}; + } + + // Currently region_id does not exist in any regions in this table, let's insert into one. + size_t partition_id = selectPartitionId(table, region_id); + return insertRegion(table, partition_id, region_id); } void RegionPartition::updateRegionRange(const RegionPtr & region) @@ -88,45 +159,244 @@ void RegionPartition::updateRegionRange(const RegionPtr & region) } } -void RegionPartition::insertRegion(Table & table, size_t partition_id, RegionID region_id) +bool RegionPartition::shouldFlush(const Partition & partition) { - auto & table_partitions = table.partitions.get(); - // Insert table mapping. - table_partitions[partition_id].region_ids.emplace(region_id); - table.partitions.persist(); + if (partition.pause_flush) + return false; + if (partition.must_flush) + return true; + if (!partition.updated || !partition.cache_bytes) + return false; + auto period_time = Clock::now() - partition.last_flush_time; + for (auto && [th_bytes, th_duration] : flush_thresholds) + { + if (partition.cache_bytes >= th_bytes && period_time >= th_duration) + return true; + } + return false; +} - // Insert region mapping. - auto r_it = regions.find(region_id); - if (r_it == regions.end()) - std::tie(r_it, std::ignore) = regions.try_emplace(region_id); - RegionInfo & region_info = r_it->second; - region_info.table_to_partition.emplace(table.table_id, partition_id); +void RegionPartition::flushPartition(TableID table_id, PartitionID partition_id) +{ + if (log->debug()) + { + auto & table = getOrCreateTable(table_id); + auto & partition = table.partitions.get()[partition_id]; + std::string region_ids; + for (auto id : partition.region_ids) + region_ids += DB::toString(id) + ","; + if (!region_ids.empty()) + region_ids.pop_back(); + LOG_DEBUG(log, + "Flush regions - table_id: " + DB::toString(table_id) + ", partition_id: " + DB::toString(partition_id) + ", ~ " + + DB::toString(partition.cache_bytes) + " bytes, containing region_ids: " + region_ids); + } + + TMTContext & tmt = context.getTMTContext(); + tmt.getSchemaSyncer()->syncSchema(table_id, context); + + StoragePtr storage = tmt.storages.get(table_id); + + // TODO: handle if storage is nullptr + // drop table and create another with same name, but the previous one will still flush + if (storage == nullptr) + { + + LOG_ERROR(log, "table " << table_id << " flush partition " << partition_id << " , but storage is not found"); + return; + } + + auto * merge_tree = dynamic_cast(storage.get()); + + const auto & table_info = merge_tree->getTableInfo(); + const auto & columns = merge_tree->getColumns(); + // TODO: confirm names is right + Names names = columns.getNamesOfPhysical(); + + BlockInputStreamPtr input = getBlockInputStreamByPartition(table_id, partition_id, table_info, columns, names, true, false, false, 0); + if (!input) + return; + + TxnMergeTreeBlockOutputStream output(*merge_tree, partition_id); + input->readPrefix(); + output.writePrefix(); + while (true) + { + Block block = input->read(); + if (!block || block.rows() == 0) + break; + output.write(block); + } + output.writeSuffix(); + input->readSuffix(); +} + +// ============================================================= +// Public member functions. +// ============================================================= + +static const Int64 FTH_BYTES_1 = 1024; // 1 KB +static const Int64 FTH_BYTES_2 = 1024 * 1024; // 1 MB +static const Int64 FTH_BYTES_3 = 1024 * 1024 * 10; // 10 MBs +static const Int64 FTH_BYTES_4 = 1024 * 1024 * 50; // 50 MBs + +static const Seconds FTH_PERIOD_1(60 * 60); // 1 hour +static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes +static const Seconds FTH_PERIOD_3(60); // 1 minute +static const Seconds FTH_PERIOD_4(5); // 5 seconds + +RegionPartition::RegionPartition(Context & context_, const std::string & parent_path_, std::function region_fetcher) + : parent_path(parent_path_), + flush_thresholds{ + {FTH_BYTES_1, FTH_PERIOD_1}, + {FTH_BYTES_2, FTH_PERIOD_2}, + {FTH_BYTES_3, FTH_PERIOD_3}, + {FTH_BYTES_4, FTH_PERIOD_4}, + }, + context(context_), + log(&Logger::get("RegionPartition")) +{ + Poco::File dir(parent_path + "tables/"); + if (!dir.exists()) + dir.createDirectories(); + + std::vector file_names; + dir.list(file_names); + + // Restore all table mappings and region mappings. + for (auto & name : file_names) + { + TableID table_id = std::stoull(name); + auto p = tables.try_emplace(table_id, parent_path + "tables/", table_id); + Table & table = p.first->second; + for (PartitionID partition_id = 0; partition_id < table.partitions.get().size(); ++partition_id) + { + auto & partition = table.partitions.get()[partition_id]; + for (auto region_id : partition.region_ids) + { + // Update cache infos + auto region_ptr = region_fetcher(region_id); + if (!region_ptr) + throw Exception("Region with id " + DB::toString(region_id) + " not found", ErrorCodes::LOGICAL_ERROR); + partition.cache_bytes += region_ptr->dataSize(); + + // Update region_id -> table_id & partition_id + auto it = regions.find(region_id); + if (it == regions.end()) + std::tie(it, std::ignore) = regions.try_emplace(region_id); + RegionInfo & region_info = it->second; + region_info.table_to_partition.emplace(table_id, partition_id); + } + } + } } -UInt64 RegionPartition::getOrInsertRegion(TableID table_id, RegionID region_id, Context & context) +void RegionPartition::updateRegion(const RegionPtr & region, size_t before_cache_bytes, TableIDSet relative_table_ids) { std::lock_guard lock(mutex); - auto & table = getTable(table_id, context); - auto & table_partitions = table.partitions.get(); + auto region_id = region->id(); + Int64 delta = region->dataSize() - before_cache_bytes; + for (auto table_id : relative_table_ids) + { + auto & partition = getOrInsertRegion(table_id, region_id).second; + partition.updated = true; + partition.cache_bytes += delta; + } +} - for (size_t partition_index = 0; partition_index < table_partitions.size(); ++partition_index) +void RegionPartition::applySnapshotRegion(const RegionPtr & region) +{ + std::lock_guard lock(mutex); + + auto region_id = region->id(); + auto table_ids = getRegionTableIds(region); + for (auto table_id : table_ids) { - const auto & partition = table_partitions[partition_index]; - if (partition.region_ids.find(region_id) != partition.region_ids.end()) - return partition_index; + auto & partition = getOrInsertRegion(table_id, region_id).second; + partition.must_flush = true; + partition.cache_bytes += region->dataSize(); } +} - // Currently region_id does not exist in any regions in this table, let's insert into one. - size_t partition_id = selectPartitionId(table, region_id); - insertRegion(table, partition_id, region_id); - return partition_id; +void RegionPartition::splitRegion(const RegionPtr & region, std::vector split_regions) +{ + struct MoveAction + { + StorageMergeTree * storage; + PartitionID current_partition_id; + PartitionID new_partition_id; + Field start; + Field end; + }; + std::vector move_actions; + + { + std::lock_guard lock(mutex); + + auto region_id = region->id(); + auto it = regions.find(region_id); + + if (it == regions.end()) + { + // If region doesn't exist, usually means it does not contain any data we interested. Just ignore it. + return; + } + + RegionInfo & region_info = it->second; + auto & tmt_ctx = context.getTMTContext(); + for (auto [table_id, current_partition_id] : region_info.table_to_partition) + { + auto storage = tmt_ctx.storages.get(table_id); + if (storage == nullptr) + { + throw Exception("Table " + DB::toString(table_id) + " not found", ErrorCodes::UNKNOWN_TABLE); + } + + auto * merge_tree_storage = dynamic_cast(storage.get()); + + auto & table = getOrCreateTable(table_id); + auto & table_partitions = table.partitions.get(); + // Tables which have only one partition don't need to move data. + if (table_partitions.size() == 1) + continue; + + for (const RegionPtr & split_region : split_regions) + { + const auto range = split_region->getRange(); + // This region definitely does not contain any data in this table. + if (!TiKVRange::checkTableInvolveRange(table_id, range)) + continue; + + // Select another partition other than current_partition_id; + auto split_region_id = split_region->id(); + size_t new_partition_id; + while ((new_partition_id = selectPartitionId(table, split_region_id)) == current_partition_id) {} + + auto [start_field, end_field] = getRegionRangeField(range.first, range.second, table_id); + move_actions.push_back({merge_tree_storage, current_partition_id, new_partition_id, start_field, end_field}); + + auto & partition = insertRegion(table, new_partition_id, split_region_id).second; + // Mark flush flag. + partition.must_flush = true; + } + } + + updateRegionRange(region); + } + + // FIXME: move data should be locked for safety, and do it aysnchronized. + for (const auto & action : move_actions) + { + moveRangeBetweenPartitions(context, action.storage, action.current_partition_id, action.new_partition_id, action.start, action.end); + } } -void RegionPartition::removeRegion(const RegionPtr & region, Context & context) +void RegionPartition::removeRegion(const RegionPtr & region) { std::unordered_map table_partitions; auto region_id = region->id(); + auto region_cache_bytes = region->dataSize(); { std::lock_guard lock(mutex); @@ -143,8 +413,9 @@ void RegionPartition::removeRegion(const RegionPtr & region, Context & context) for (auto [table_id, partition_id] : table_partitions) { - auto & table = getTable(table_id, context); + auto & table = getOrCreateTable(table_id); Partition & partition = table.partitions.get().at(partition_id); + partition.cache_bytes -= region_cache_bytes; partition.region_ids.erase(region_id); table.partitions.persist(); } @@ -168,110 +439,66 @@ void RegionPartition::removeRegion(const RegionPtr & region, Context & context) } } -void learner_read_fn(RegionPtr region) { - Int64 idx = region->learner_read(); - region->wait_index(idx); -} - -BlockInputStreamPtr RegionPartition::getBlockInputStreamByPartition(TableID table_id, UInt64 partition_id, - const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, Context & context, - bool remove_on_read, - bool learner_read, - bool resolve_locks, - UInt64 start_ts) +bool RegionPartition::tryFlushRegions() { - auto & kvstore = context.getTMTContext().kvstore; - Regions partition_regions; + KVStore & kvstore = *context.getTMTContext().kvstore; + std::set> to_flush; { - std::lock_guard lock(mutex); - - auto & table = getTable(table_id, context); - auto & partition = table.partitions.get()[partition_id]; - - for (auto & region_id : partition.region_ids) - { - auto region = kvstore->getRegion(region_id); - if (!region) - throw Exception("Region " + DB::toString(region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); - partition_regions.push_back(region); - } + traversePartitions([&](TableID table_id, PartitionID partition_id, Partition & partition) { + if (shouldFlush(partition)) + { + to_flush.emplace(table_id, partition_id); + // Stop other flush threads. + partition.pause_flush = true; + } + }); } - if (partition_regions.empty()) - return {}; - - if (learner_read) { - std::vector learner_threads; - for (auto region: partition_regions) { - learner_threads.push_back(std::thread(learner_read_fn, region)); - } - for (auto & thread: learner_threads) { - thread.join(); - } + for (auto [table_id, partition_id] : to_flush) + { + flushPartition(table_id, partition_id); } - auto schema_fetcher = [&](TableID) { - // TODO: We may should clone all this vars, to avoid out of live time in multi thread environment - return std::make_tuple(&table_info, &columns, &ordered_columns); - }; - - RegionBlockReader reader(schema_fetcher, std::move(partition_regions), table_id, remove_on_read, resolve_locks, start_ts); - - BlocksList blocks; - Block block; - TableID current_table_id; - RegionID current_region_id; - Region::LockInfoPtr lock_info; - Region::LockInfos lock_infos; - while (true) { - std::tie(block, current_table_id, current_region_id, lock_info) = reader.next(); - if (lock_info) - { - lock_infos.emplace_back(std::move(lock_info)); - continue; - } - if (!block || block.rows() == 0) - break; - if (table_id != current_table_id) - throw Exception("RegionPartition::getBlockInputStreamByPartition", ErrorCodes::LOGICAL_ERROR); - blocks.push_back(block); + // Now reset status infomations. + Timepoint now = Clock::now(); + traversePartitions([&](TableID table_id, PartitionID partition_id, Partition & partition) { + if (to_flush.count({table_id, partition_id})) + { + partition.pause_flush = false; + partition.must_flush = false; + partition.updated = false; + partition.cache_bytes = calculatePartitionCacheBytes(kvstore, partition.region_ids); + partition.last_flush_time = now; + } + }); } - if (!lock_infos.empty()) - throw LockException(std::move(lock_infos)); - - if (blocks.empty()) - return {}; - - return std::make_shared(std::move(blocks)); + return !to_flush.empty(); } -void RegionPartition::dropRegionsInTable(TableID /*table_id*/) -{ - // TODO: impl -} - -void RegionPartition::traverseTablesOfRegion(RegionID region_id, std::function handler) +void RegionPartition::traversePartitions(std::function callback) { std::lock_guard lock(mutex); - - auto it = regions.find(region_id); - if (it == regions.end()) - return; - RegionInfo & region_info = it->second; - for (auto p : region_info.table_to_partition) - handler(p.first); + for (auto && [table_id, table] : tables) + { + size_t id = 0; + for (auto & partition : table.partitions.get()) + { + callback(table_id, id, partition); + ++id; + } + } } -void RegionPartition::traverseRegionsByTablePartition(const TableID table_id, const PartitionID partition_id, - Context& context, std::function callback) +void RegionPartition::traverseRegionsByTablePartition( + const TableID table_id, const PartitionID partition_id, std::function callback) { auto & kvstore = context.getTMTContext().kvstore; Regions partition_regions; { std::lock_guard lock(mutex); - auto & table = getTable(table_id, context); + auto & table = getOrCreateTable(table_id); auto & partition = table.partitions.get()[partition_id]; for (auto & region_id : partition.region_ids) @@ -285,91 +512,15 @@ void RegionPartition::traverseRegionsByTablePartition(const TableID table_id, co callback(partition_regions); } -void RegionPartition::splitRegion(const RegionPtr & region, std::vector split_regions, Context & context) -{ - auto region_id = region->id(); - auto it = regions.find(region_id); - - // We cannot find this region in mapping, it means we have never flushed this region before. - // It is a good news as we don't need to handle data in table partition. - if (it == regions.end()) - return; - - RegionInfo & region_info = it->second; - auto & tmt_ctx = context.getTMTContext(); - for (auto [table_id, current_partition_id] : region_info.table_to_partition) - { - - // TODO Maybe we should lock current table here, but it will cause deadlock. - // So I decide to leave it here until we implement atomic range data move. - - auto storage = tmt_ctx.storages.get(table_id); - if (storage == nullptr) - { - LOG_WARNING(log, "RegionPartition::splitRegion: " << table_id << " does not exist."); - continue; - } - - auto * merge_tree = dynamic_cast(storage.get()); - - auto & table = getTable(table_id, context); - auto & table_partitions = table.partitions.get(); - // Tables which have only one partition don't need to move data. - if (table_partitions.size() == 1) - continue; - - for (const RegionPtr & split_region : split_regions) - { - const auto range = split_region->getRange(); - - // This region definitely does not contain any data in this table. - if (!TiKVRange::checkTableInvolveRange(table_id, range)) - continue; - - // Select another partition other than current_partition_id; - auto split_region_id = split_region->id(); - size_t new_partition_id; - while ((new_partition_id = selectPartitionId(table, split_region_id)) == current_partition_id) {} - - auto [start_field, end_field] = getRegionRangeField(range.first, range.second, table_id); - moveRangeBetweenPartitions(context, merge_tree, current_partition_id, new_partition_id, start_field, end_field); - - insertRegion(table, new_partition_id, split_region_id); - } - } - - updateRegionRange(region); -} - -RegionPartition::Table & RegionPartition::getTable(TableID table_id, Context & context) -{ - auto it = tables.find(table_id); - if (it == tables.end()) - { - // Load persisted info. - auto & tmt_ctx = context.getTMTContext(); - auto storage = tmt_ctx.storages.get(table_id); - if (!storage) - { - tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); - storage = tmt_ctx.storages.get(table_id); - } - - auto * merge_tree = dynamic_cast(storage.get()); - auto partition_number = merge_tree->getData().settings.mutable_mergetree_partition_number; - - std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id, partition_number); - - auto & table = it->second; - table.partitions.persist(); - } - return it->second; -} - void RegionPartition::dumpRegionMap(RegionPartition::RegionMap & res) { std::lock_guard lock(mutex); res = regions; } +void RegionPartition::dropRegionsInTable(TableID /*table_id*/) +{ + // TODO: impl +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionPartition.h b/dbms/src/Storages/Transaction/RegionPartition.h index 8453fdea42a..0575702372f 100644 --- a/dbms/src/Storages/Transaction/RegionPartition.h +++ b/dbms/src/Storages/Transaction/RegionPartition.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -30,48 +31,18 @@ namespace DB class RegionPartition : private boost::noncopyable { public: - RegionPartition(const std::string & parent_path_); - - /// Remove range data of splitted_regions, from corresponding partitions - void splitRegion(const RegionPtr & region, std::vector split_regions, Context & context); - - /// Return partition_id. If region does not exist in this table, choose one partition to insert. - UInt64 getOrInsertRegion(TableID table_id, RegionID region_id, Context & context); - void removeRegion(const RegionPtr & region, Context & context); - - // TODO optimize the params. - BlockInputStreamPtr getBlockInputStreamByPartition(TableID table_id, - UInt64 partition_id, - const TiDB::TableInfo & table_info, - const ColumnsDescription & columns, - const Names & ordered_columns, - Context & context, - bool remove_on_read, - bool learner_read, - bool resolve_locks, - UInt64 start_ts); - - /// This functional only shrink the table range of this region_id, range expand will (only) be done at flush. - /// Note that region update range should not affect the data in storage. - void updateRegionRange(const RegionPtr & region); - - void dropRegionsInTable(TableID table_id); - - void traverseTablesOfRegion(RegionID region_id, std::function callback); - - void traverseRegionsByTablePartition(const TableID table_id, const PartitionID partition_id, Context& context, - std::function callback); - -public: - using RegionIDSet = std::set; - struct Partition { + using RegionIDSet = std::set; RegionIDSet region_ids; + Partition() {} + Partition(const Partition & p) : region_ids(p.region_ids) {} + Partition(RegionIDSet && region_ids_) : region_ids(std::move(region_ids_)) {} + struct Write { - void operator()(Partition p, DB::WriteBuffer & buf) + void operator()(const Partition & p, DB::WriteBuffer & buf) { writeIntBinary(p.region_ids.size(), buf); for (auto id : p.region_ids) @@ -83,20 +54,32 @@ class RegionPartition : private boost::noncopyable struct Read { - auto operator()(DB::ReadBuffer & buf) + Partition operator()(DB::ReadBuffer & buf) { UInt64 size; - Partition p; readIntBinary(size, buf); + RegionIDSet region_ids; for (size_t i = 0; i < size; ++i) { RegionID id; readIntBinary(id, buf); - p.region_ids.insert(id); + region_ids.insert(id); } - return p; + return {std::move(region_ids)}; } }; + + // Statistics to serve flush. + // Note that it is not 100% accurate because one region could belongs to more than one partition by different tables. + // And when a region is updated, we don't distinguish carefully which partition, simply update them all. + // It is not a real issue as do flush on a partition is not harmful. Besides, the situation is very rare that a region belongs to many partitions. + // Those members below are not persisted. + + bool pause_flush = false; + bool must_flush = false; + bool updated = false; + Int64 cache_bytes = 0; + Timepoint last_flush_time = Clock::now(); }; struct Table @@ -131,40 +114,76 @@ class RegionPartition : private boost::noncopyable std::unordered_map table_to_partition; }; - Table & getTable(TableID table_id, Context & context); - - void insertRegion(Table & table, size_t partition_id, RegionID region_id); - - size_t selectPartitionId(Table & table, RegionID region_id) - { - // size_t partition_id = rng() % table.partitions.get().size(); - size_t partition_id = (next_partition_id ++) % table.partitions.get().size(); - LOG_DEBUG(log, "Table " << table.table_id << " assign region " << region_id << " to partition " << partition_id); - return partition_id; - } - using TableMap = std::unordered_map; using RegionMap = std::unordered_map; - - // For debug - void dumpRegionMap(RegionPartition::RegionMap & res); + using FlushThresholds = std::vector>; private: const std::string parent_path; - // TODO: One container mutex + one mutext for per partition will be faster - // Partition info persisting is slow, it's slow when all persistings are under one mutex TableMap tables; RegionMap regions; - mutable std::mutex mutex; + FlushThresholds flush_thresholds; - // TODO fix me: currently we use random to pick one partition here, may need to change that. // std::minstd_rand rng = std::minstd_rand(randomSeed()); - size_t next_partition_id = 0; + Context & context; + + mutable std::mutex mutex; Logger * log; + +private: + Table & getOrCreateTable(TableID table_id); + size_t selectPartitionId(Table & table, RegionID region_id); + std::pair insertRegion(Table & table, size_t partition_id, RegionID region_id); + std::pair getOrInsertRegion(TableID table_id, RegionID region_id); + + /// This functional only shrink the table range of this region_id, range expand will (only) be done at flush. + /// Note that region update range should not affect the data in storage. + void updateRegionRange(const RegionPtr & region); + + bool shouldFlush(const Partition & partition); + void flushPartition(TableID table_id, PartitionID partition_id); + +public: + RegionPartition(Context & context_, const std::string & parent_path_, std::function region_fetcher); + void setFlushThresholds(FlushThresholds flush_thresholds_) { flush_thresholds = std::move(flush_thresholds_); } + + /// After the region is updated (insert or delete KVs). + void updateRegion(const RegionPtr & region, size_t before_cache_bytes, TableIDSet relative_table_ids); + /// A new region arrived by apply snapshot command, this function store the region into selected partitions. + void applySnapshotRegion(const RegionPtr & region); + /// Manage data after region split into split_regions. + /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. + void splitRegion(const RegionPtr & region, std::vector split_regions); + /// Remove a region from corresponding partitions. + void removeRegion(const RegionPtr & region); + + /// Try pick some regions and flush. + /// Note that flush is organized by partition. i.e. if a regions is selected to be flushed, all regions belong to its partition will also flushed. + /// This function will be called constantly by background threads. + /// Returns whether this function has done any meaningful job. + bool tryFlushRegions(); + + void traversePartitions(std::function callback); + void traverseRegionsByTablePartition(const TableID table_id, const PartitionID partition_id, std::function callback); + + BlockInputStreamPtr getBlockInputStreamByPartition( // + TableID table_id, + UInt64 partition_id, + const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, + const Names & ordered_columns, + bool remove_on_read, + bool learner_read, + bool resolve_locks, + UInt64 start_ts); + + // For debug + void dumpRegionMap(RegionPartition::RegionMap & res); + void dropRegionsInTable(TableID table_id); }; using RegionPartitionPtr = std::shared_ptr; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index dc859997da2..749a7a04f9b 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -1,18 +1,18 @@ #include #include -#include #include +#include namespace DB { -TMTContext::TMTContext(Context & context, std::vector addrs, size_t flush_deadline_seconds, size_t flush_threshold_rows) : - kvstore(std::make_shared(context.getPath() + "kvstore/", &context)), - table_flushers(context, flush_deadline_seconds, flush_threshold_rows), - region_partition(context.getPath() + "regmap/"), - schema_syncer(std::make_shared()), - pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) : static_cast(new pingcap::pd::Client(addrs))), - region_cache(std::make_shared(pd_client)) +TMTContext::TMTContext(Context & context, std::vector addrs) + : kvstore(std::make_shared(context.getPath() + "kvstore/", &context)), + region_partition(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)), + schema_syncer(std::make_shared()), + pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) + : static_cast(new pingcap::pd::Client(addrs))), + region_cache(std::make_shared(pd_client)) {} SchemaSyncerPtr TMTContext::getSchemaSyncer() const @@ -39,11 +39,10 @@ void TMTContext::setPDClient(pingcap::pd::ClientPtr rhs) pd_client = rhs; } -pingcap::kv::RegionCachePtr TMTContext::getRegionCache() const { - return region_cache; -} +pingcap::kv::RegionCachePtr TMTContext::getRegionCache() const { return region_cache; } -pingcap::kv::RpcClientPtr TMTContext::getRpcClient() { +pingcap::kv::RpcClientPtr TMTContext::getRpcClient() +{ if (rpc_client == nullptr) rpc_client = std::make_shared(); return rpc_client; diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index 856e86b3b97..b4bb7418675 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -1,13 +1,12 @@ #pragma once -#include #include +#include #include #include #include #include -#include namespace DB { @@ -19,12 +18,11 @@ class TMTContext public: KVStorePtr kvstore; TMTStorages storages; - TMTTableFlushers table_flushers; RegionPartition region_partition; public: // TODO: get flusher args from config file - explicit TMTContext(Context & context_, std::vector addrs, size_t deadline_seconds = 5, size_t flush_threshold_rows = 1024); + explicit TMTContext(Context & context_, std::vector addrs); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); @@ -37,10 +35,10 @@ class TMTContext pingcap::kv::RpcClientPtr getRpcClient(); private: - SchemaSyncerPtr schema_syncer; - pingcap::pd::ClientPtr pd_client; + SchemaSyncerPtr schema_syncer; + pingcap::pd::ClientPtr pd_client; pingcap::kv::RegionCachePtr region_cache; - pingcap::kv::RpcClientPtr rpc_client; + pingcap::kv::RpcClientPtr rpc_client; mutable std::mutex mutex; }; diff --git a/dbms/src/Storages/Transaction/TMTTableFlusher.cpp b/dbms/src/Storages/Transaction/TMTTableFlusher.cpp deleted file mode 100644 index 56d5cb20b95..00000000000 --- a/dbms/src/Storages/Transaction/TMTTableFlusher.cpp +++ /dev/null @@ -1,229 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -TMTTableFlusher::TMTTableFlusher(Context & context_, TableID table_id_, size_t flush_threshold_rows_) : - context(context_), table_id(table_id_), flush_threshold_rows(flush_threshold_rows_), - log(&Logger::get("TMTTableFlusher")) -{ - TMTContext & tmt = context.getTMTContext(); - tmt.getSchemaSyncer()->syncSchema(table_id, context); -} - -void TMTTableFlusher::setFlushThresholdRows(size_t flush_threshold_rows) -{ - std::lock_guard lock(mutex); - this->flush_threshold_rows = flush_threshold_rows; -} - -void TMTTableFlusher::onPutNotification(RegionPtr region, size_t put_rows) -{ - std::lock_guard lock(mutex); - - RegionID region_id = region->id(); - TMTContext & tmt = context.getTMTContext(); - PartitionID partition_id = tmt.region_partition.getOrInsertRegion(table_id, region_id, context); - - auto it = partitions.find(partition_id); - if (it == partitions.end()) - it = partitions.emplace(partition_id, TMTTableFlusher::Partition(partition_id, put_rows)).first; - else - it->second.onPut(put_rows); - - if (it->second.cached_rows >= flush_threshold_rows) - asyncFlush(partition_id); -} - -void TMTTableFlusher::tryAsyncFlush(size_t deadline_seconds) -{ - std::lock_guard lock(mutex); - - auto now = TMTTableFlusher::Clock::now(); - for (auto it = partitions.begin(); it != partitions.end(); ++it) - { - if (now - it->second.last_modified_time > std::chrono::seconds(deadline_seconds) && (it->second.cached_rows > 0)) - asyncFlush(it->second.partition_id); - } -} - -void TMTTableFlusher::asyncFlush(UInt64 partition_id) -{ - // TODO: async flush, flush queue, etc - flush(partition_id); - auto it = partitions.find(partition_id); - if (it == partitions.end()) - throw Exception("Partition id(?) not exists", ErrorCodes::LOGICAL_ERROR); - it->second.onFlush(); -} - -void TMTTableFlusher::flush(UInt64 partition_id) -{ - TMTContext & tmt = context.getTMTContext(); - tmt.getSchemaSyncer()->syncSchema(table_id, context); - - StoragePtr storage = tmt.storages.get(table_id); - - // TODO: handle if storage is nullptr - // drop table and create another with same name, but the previous one will still flush - if (storage == nullptr) - { - LOG_ERROR(log, "table " << table_id <<" flush partition " << partition_id << " , but storage is not found"); - return; - } - - StorageMergeTree * merge_tree = dynamic_cast(storage.get()); - - const auto & table_info = merge_tree->getTableInfo(); - const auto & columns = merge_tree->getColumns(); - // TODO: confirm names is right - Names names = columns.getNamesOfPhysical(); - - BlockInputStreamPtr input = tmt.region_partition.getBlockInputStreamByPartition( - table_id, partition_id, table_info, columns, names, context, true, false, false, 0); - if (!input) - return; - - TxnMergeTreeBlockOutputStream output(*merge_tree, partition_id); - input->readPrefix(); - output.writePrefix(); - while (true) - { - Block block = input->read(); - if (!block || block.rows() == 0) - break; - output.write(block); - } - output.writeSuffix(); - input->readSuffix(); -} - -TMTTableFlushers::TMTTableFlushers(Context & context_, size_t deadline_seconds_, - size_t flush_threshold_rows_) : context(context_), deadline_seconds(deadline_seconds_), - flush_threshold_rows(flush_threshold_rows_), interval_thread_stopping(false) -{ - interval_thread = std::thread([&] - { - while (!interval_thread_stopping) - { - // TODO: since we don't have a real async flush implement, we clone flushers to avoid long time locking, for now - FlusherMap cloned; - { - std::lock_guard lock(mutex); - cloned = flushers; - } - - for (auto it = cloned.begin(); it != cloned.end(); ++it) - it->second->tryAsyncFlush(deadline_seconds); - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - }); -} - -TMTTableFlushers::~TMTTableFlushers() -{ - interval_thread_stopping = true; - - if (interval_thread.joinable()) - interval_thread.join(); -} - -void TMTTableFlushers::setFlushThresholdRows(size_t flush_threshold_rows) -{ - std::lock_guard lock(mutex); - this->flush_threshold_rows = flush_threshold_rows; - - for (auto it = flushers.begin(); it != flushers.end(); ++it) - it->second->setFlushThresholdRows(flush_threshold_rows); -} - -void TMTTableFlushers::setDeadlineSeconds(size_t deadline_seconds) -{ - std::lock_guard lock(mutex); - this->deadline_seconds = deadline_seconds; -} - -TMTTableFlusherPtr TMTTableFlushers::getOrCreate(TableID table_id) -{ - std::lock_guard lock(mutex); - - auto it = flushers.find(table_id); - if (it != flushers.end()) - return it->second; - - TMTTableFlusherPtr flusher = std::make_shared(context, - table_id, flush_threshold_rows); - flushers.emplace(table_id, flusher); - return flusher; -} - -void TMTTableFlushers::onPutTryFlush(RegionPtr region) -{ - auto region_id = region->id(); - TMTContext & tmt = context.getTMTContext(); - - // TODO: Revise the way of "finding tables in a given region". - // Now WAR using data CF, which may miss the short value in write CF, so make the value long!!! - // - // const Region::KVMap & data_cf = region->data_cf_(); - // std::unordered_set table_ids; - // for (const auto & kv : data_cf) - // { - // const auto & key = kv.first; - // table_ids.emplace(RecordKVFormat::getTableId(key)); - // } - - // TODO: Use this to instead codes above, and make it faster - std::unordered_set table_ids; - { - auto scanner = region->createCommittedScanRemover(InvalidTableID); - while (true) - { - TableID table_id = scanner->hasNext(); - if (table_id == InvalidTableID) - break; - table_ids.emplace(table_id); - scanner->next(); - } - } - - for (const auto table_id : table_ids) - { - tmt.region_partition.getOrInsertRegion(table_id, region_id, context); - - auto flusher = getOrCreate(table_id); - // This is not strickly the right number of newly added rows, but in most cases it works - flusher->onPutNotification(region, region->getNewlyAddedRows()); - region->resetNewlyAddedRows(); - } - - tmt.region_partition.updateRegionRange(region); - - // TODO: This doesn't work unless we settle down the region/table mapping problem. - // auto handler = [&] (TableID table_id) - // { - // auto flusher = getOrCreate(table_id); - // // This is not strickly the right number of newly added rows, but in most cases it works - // flusher->onPutNotification(region, region->getNewlyAddedRows()); - // region->resetNewlyAddedRows(); - // }; - // tmt.region_partition.traverseTablesOfRegion(region_id, handler); -} - -void TMTTableFlushers::dropRegionsInTable(TableID /*table_id*/) -{ - // TODO: impl -} - -} diff --git a/dbms/src/Storages/Transaction/TMTTableFlusher.h b/dbms/src/Storages/Transaction/TMTTableFlusher.h deleted file mode 100644 index fe29925b9ea..00000000000 --- a/dbms/src/Storages/Transaction/TMTTableFlusher.h +++ /dev/null @@ -1,103 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -class Context; - -class TMTTableFlusher -{ -public: - using Clock = std::chrono::steady_clock; - - struct Partition - { - UInt64 partition_id; - Clock::time_point last_modified_time; - size_t cached_rows; - - Partition() = default; - - Partition(UInt64 partition_id_, size_t cached_rows_) : partition_id(partition_id_), - last_modified_time(Clock::now()), cached_rows(cached_rows_) {} - - void onPut(size_t put_rows) - { - last_modified_time = Clock::now(); - cached_rows += put_rows; - } - - void onFlush() - { - last_modified_time = Clock::now(); - cached_rows = 0; - } - }; - -public: - TMTTableFlusher(Context & context, TableID table_id, size_t flush_threshold_rows); - - void setFlushThresholdRows(size_t flush_threshold_rows); - - void onPutNotification(RegionPtr region, size_t put_rows); - - void tryAsyncFlush(size_t deadline_seconds); - -private: - void asyncFlush(UInt64 partition_id); - void flush(UInt64 partition_id); - -private: - Context & context; - - std::atomic table_id; - std::atomic flush_threshold_rows; - - std::unordered_map partitions; - std::mutex mutex; - - Logger * log; -}; - -using TMTTableFlusherPtr = std::shared_ptr; - - -class TMTTableFlushers -{ -public: - TMTTableFlushers(Context & context, size_t deadline_seconds_, size_t flush_threshold_rows_); - virtual ~TMTTableFlushers(); - - void setFlushThresholdRows(size_t flush_threshold_rows); - void setDeadlineSeconds(size_t deadline_seconds); - - void onPutTryFlush(RegionPtr region); - - void dropRegionsInTable(TableID table_id); - -private: - // Get or create the specified table's partition flusher, the table should be created. - TMTTableFlusherPtr getOrCreate(TableID table_id); - -private: - Context & context; - - std::atomic deadline_seconds; - std::atomic flush_threshold_rows; - - std::thread interval_thread; - std::atomic interval_thread_stopping; - - using FlusherMap = std::unordered_map; - FlusherMap flushers; - std::mutex mutex; -}; - -} diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index b1bdb0844aa..f313abf3add 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -1,6 +1,9 @@ #pragma once #include +#include +#include +#include #include @@ -8,6 +11,7 @@ namespace DB { using TableID = Int64; +using TableIDSet = std::unordered_set; enum : TableID { @@ -36,4 +40,9 @@ enum : RegionID InvalidRegionID = 0 }; +using Clock = std::chrono::system_clock; +using Timepoint = Clock::time_point; +using Duration = Clock::duration; +using Seconds = std::chrono::seconds; + } // namespace DB