diff --git a/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp b/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp new file mode 100644 index 00000000000..891b8788abf --- /dev/null +++ b/dbms/src/DataStreams/RangesFilterBlockInputStream.cpp @@ -0,0 +1,83 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +Block RangesFilterBlockInputStream::readImpl() +{ + while (true) + { + Block block = input->read(); + if (!block) + return block; + + if (!block.has(handle_col_name)) + throw Exception("RangesFilterBlockInputStream: block without _tidb_rowid.", ErrorCodes::LOGICAL_ERROR); + + const ColumnWithTypeAndName & handle_column = block.getByName(handle_col_name); + const ColumnInt64 * column = typeid_cast(handle_column.column.get()); + if (!column) + { + throw Exception("RangesFilterBlockInputStream: _tidb_rowid column should be type ColumnInt64.", ErrorCodes::LOGICAL_ERROR); + } + + size_t rows = block.rows(); + + auto handle_begin = column->getElement(0); + auto handle_end = column->getElement(rows - 1); + + if (handle_begin >= ranges.second || ranges.first > handle_end) + continue; + + if (handle_begin >= ranges.first) + { + if (handle_end < ranges.second) + { + return block; + } + else + { + size_t pos + = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin(); + size_t pop_num = rows - pos; + for (size_t i = 0; i < block.columns(); i++) + { + ColumnWithTypeAndName & ori_column = block.getByPosition(i); + MutableColumnPtr mutable_holder = (*std::move(ori_column.column)).mutate(); + mutable_holder->popBack(pop_num); + ori_column.column = std::move(mutable_holder); + } + } + } + else + { + size_t pos_begin + = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.first) - column->getData().cbegin(); + size_t pos_end = rows; + if (handle_end >= ranges.second) + pos_end = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin(); + + size_t len = pos_end - pos_begin; + if (!len) + continue; + for (size_t i = 0; i < block.columns(); i++) + { + ColumnWithTypeAndName & ori_column = block.getByPosition(i); + auto new_column = ori_column.column->cloneEmpty(); + new_column->insertRangeFrom(*ori_column.column, pos_begin, len); + ori_column.column = std::move(new_column); + } + } + + return block; + } +} + +} // namespace DB diff --git a/dbms/src/DataStreams/RangesFilterBlockInputStream.h b/dbms/src/DataStreams/RangesFilterBlockInputStream.h new file mode 100644 index 00000000000..6a98f791d3b --- /dev/null +++ b/dbms/src/DataStreams/RangesFilterBlockInputStream.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class RangesFilterBlockInputStream : public IProfilingBlockInputStream +{ +public: + RangesFilterBlockInputStream(const BlockInputStreamPtr & input_, const HandleRange & ranges_, const String & handle_col_name_) : input(input_), ranges(ranges_), handle_col_name(handle_col_name_) {} + +protected: + Block getHeader() const override { return input->getHeader(); } + + bool isGroupedOutput() const override { return input->isGroupedOutput(); } + + bool isSortedOutput() const override { return input->isSortedOutput(); } + + const SortDescription & getSortDescription() const override { return input->getSortDescription(); } + + String getName() const override { return "RangesFilter"; } + + Block readImpl() override; + +private: + BlockInputStreamPtr input; + const HandleRange ranges; + const String handle_col_name; + Logger * log = &Logger::get("RangesFilterBlockInputStream"); +}; + +} // namespace DB diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index b53701b36c4..ee3c5726b77 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -51,10 +51,7 @@ DBGInvoker::DBGInvoker() regFunc("region_snapshot", dbgFuncRegionSnapshot); regFunc("rm_region_data", dbgFuncRegionRmData); - regFunc("dump_partition", dbgFuncRegionPartition); - regFunc("check_partition", dbgFuncCheckPartitionRegionRows); - regFunc("scan_partition", dbgFuncScanPartitionExtraRows); - regFunc("check_region_correct", dbgFuncCheckRegionCorrect); + regFunc("dump_region", dbgFuncDumpRegion); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/dbgFuncMockTiDBData.cpp b/dbms/src/Debug/dbgFuncMockTiDBData.cpp index 73119185d84..01618344c25 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBData.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBData.cpp @@ -26,7 +26,7 @@ void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker:: auto seconds = safeGet(typeid_cast(*args[1]).value); TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.setFlushThresholds({{bytes, Seconds(seconds)}}); + tmt.region_table.setFlushThresholds({{bytes, Seconds(seconds)}}); std::stringstream ss; ss << "set flush threshold to (" << bytes << " bytes, " << seconds << " seconds)"; diff --git a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp index a63cf13eca1..77046672a5d 100644 --- a/dbms/src/Debug/dbgFuncMockTiDBTable.cpp +++ b/dbms/src/Debug/dbgFuncMockTiDBTable.cpp @@ -93,7 +93,7 @@ void dbgFuncDropTiDBTable(Context & context, const ASTs & args, DBGInvoker::Prin } TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.dropRegionsInTable(table_id); + tmt.region_table.dropRegionsInTable(table_id); MockTiDB::instance().dropTable(database_name, table_name); diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index cc34faaa5df..e8ddeda2575 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -176,7 +176,7 @@ std::string getEndKeyString(TableID table_id, const TiKVKey & end_key) } } -void dbgFuncRegionPartition(Context & context, const ASTs & args, DBGInvoker::Printer output) +void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output) { if (args.size() > 1) { @@ -190,18 +190,15 @@ void dbgFuncRegionPartition(Context & context, const ASTs & args, DBGInvoker::Pr auto & tmt = context.getTMTContext(); - RegionPartition::RegionMap regions; - tmt.region_partition.dumpRegionMap(regions); + RegionTable::RegionMap regions; + tmt.region_table.dumpRegionMap(regions); for (const auto & it: regions) { auto region_id = it.first; - auto & t2p = it.second.table_to_partition; - for (const auto & info: t2p) + const auto & table_ids = it.second.tables; + for (const auto table_id : table_ids) { - auto table_id = info.first; - auto partition_id = info.second; - std::stringstream region_range_info; if (show_region) { @@ -222,8 +219,7 @@ void dbgFuncRegionPartition(Context & context, const ASTs & args, DBGInvoker::Pr } std::stringstream ss; - ss << "table #" << table_id << " partition #" << partition_id << - " region #" << region_id << region_range_info.str(); + ss << "table #" << table_id << " region #" << region_id << region_range_info.str(); output(ss.str()); } } @@ -231,7 +227,7 @@ void dbgFuncRegionPartition(Context & context, const ASTs & args, DBGInvoker::Pr void dbgFuncRegionRmData(Context & /*context*/, const ASTs & /*args*/, DBGInvoker::Printer /*output*/) { - // TODO: port from RegionPartitionMgr to RegionPartition + // TODO: port from RegionPartitionMgr to RegionTable /* if (args.size() != 1) throw Exception("Args not matched, should be: region-id", ErrorCodes::BAD_ARGUMENTS); @@ -276,11 +272,11 @@ size_t executeQueryAndCountRows(Context & context,const std::string & query) return count; } -std::vector> getPartitionRegionRanges( - Context & context, TableID table_id, PartitionID partition_id, std::vector * vec = nullptr) +std::vector> getRegionRanges( + Context& context, TableID table_id, std::vector* vec = nullptr) { std::vector> handle_ranges; - std::function callback = [&](Regions regions) + auto callback = [&](Regions regions) { for (auto region : regions) { @@ -296,267 +292,8 @@ std::vector> getPartitionRegionRanges( }; TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, callback); + tmt.region_table.traverseRegionsByTable(table_id, callback); return handle_ranges; } -void dbgFuncCheckPartitionRegionRows(Context & context, const ASTs & args, DBGInvoker::Printer output) -{ - if (args.size() < 2) - { - throw Exception("Args not matched, should be: database-name, table-name, [show-region-ranges]", - ErrorCodes::BAD_ARGUMENTS); - } - - const String & pk_name = MutableSupport::tidb_pk_column_name; - - const String & database_name = typeid_cast(*args[0]).name; - const String & table_name = typeid_cast(*args[1]).name; - - bool show_region = false; - if (args.size() > 2) - show_region = (std::string(typeid_cast(*args[2]).name) == "true"); - - TableID table_id = getTableID(context, database_name, table_name); - const auto partition_number = context.getMergeTreeSettings().mutable_mergetree_partition_number; - - size_t partitions_rows = 0; - - for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) - { - std::vector tikv_keys; - auto handle_ranges = getPartitionRegionRanges(context, table_id, partition_id, &tikv_keys); - std::vector rows_list(handle_ranges.size(), 0); - - size_t regions_rows = 0; - for (size_t i = 0; i < handle_ranges.size(); ++i) - { - const auto [start_handle, end_handle, region_id] = handle_ranges[i]; - (void)region_id; // ignore region_id - std::stringstream query; - query << "SELECT " << pk_name << " FROM " << database_name << "." << table_name << - " PARTITION ('" << partition_id << "') WHERE (" << - start_handle << " <= " << pk_name << ") AND (" << - pk_name << " < " << end_handle << ")"; - size_t rows = executeQueryAndCountRows(context, query.str()); - regions_rows += rows; - rows_list[i] = rows; - } - - std::stringstream query; - query << "SELECT " << pk_name << " FROM " << database_name << "." << - table_name << " PARTITION ('" << partition_id << "')"; - size_t partition_rows = executeQueryAndCountRows(context, query.str()); - - std::stringstream out; - out << ((partition_rows != regions_rows) ? "EE" : "OK") << - " partition #" << partition_id << ": " << partition_rows << " rows, " << - handle_ranges.size() << " regions, sum(regions rows): " << regions_rows; - if (show_region && !handle_ranges.empty()) - { - std::stringstream ss; - ss << ", regions:"; - for (size_t i = 0; i< handle_ranges.size(); ++i) - { - const auto & [start_handle, end_handle, region_id] = handle_ranges[i]; - const auto & [start_key, end_key] = tikv_keys[i]; - ss << " #" << region_id << "(" << getRegionKeyString(start_handle, start_key) << - ", " << getRegionKeyString(end_handle, end_key) << ". " << rows_list[i] << " rows)"; - } - out << ss.str(); - } - output(out.str()); - partitions_rows += partition_rows; - } - - output("sum(partitions rows): " + toString(partitions_rows)); -} - -void dbgFuncScanPartitionExtraRows(Context & context, const ASTs & args, DBGInvoker::Printer output) -{ - if (args.size() != 2) - throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS); - - const String & pk_name = MutableSupport::tidb_pk_column_name; - - const String & database_name = typeid_cast(*args[0]).name; - const String & table_name = typeid_cast(*args[1]).name; - - TableID table_id = getTableID(context, database_name, table_name); - const auto partition_number = context.getMergeTreeSettings().mutable_mergetree_partition_number; - - size_t partitions_rows = 0; - - for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) - { - auto handle_ranges = getPartitionRegionRanges(context, table_id, partition_id); - sort(handle_ranges.begin(), handle_ranges.end()); - - std::vector> handle_ranges_check; - - for (int i = 0; i < (int) handle_ranges.size(); ++i) - { - handle_ranges_check.push_back({std::get<0>(handle_ranges[i]), i}); - handle_ranges_check.push_back({std::get<1>(handle_ranges[i]), i}); - } - sort(handle_ranges_check.begin(), handle_ranges_check.end()); - - std::stringstream ss; - ss << "SELECT " << pk_name << " FROM " << database_name << "." << table_name << - " PARTITION ('" << partition_id << "')"; - std::string query = ss.str(); - - Context query_context = context; - query_context.setSessionContext(query_context); - BlockInputStreamPtr input = executeQuery(query, query_context, true, QueryProcessingStage::Complete).in; - - input->readPrefix(); - while (true) - { - Block block = input->read(); - if (!block || block.rows() == 0) - break; - partitions_rows += block.rows(); - - auto col = block.begin()->column; - for (size_t i = 0, n = col->size(); i < n; ++i) - { - Field field = (*col)[i]; - auto handle_id = field.get(); - auto it = std::upper_bound(handle_ranges_check.begin(), handle_ranges_check.end(), - std::make_pair(handle_id, std::numeric_limits::max())); - - bool is_handle_not_found = false; - if (it == handle_ranges_check.end()) - { - is_handle_not_found = true; - } - else - { - int idx = it->second; - if (!(std::get<0>(handle_ranges[idx]) <= handle_id && - std::get<1>(handle_ranges[idx]) > handle_id)) - { - is_handle_not_found = true; - } - } - - if (is_handle_not_found) - { - output("partition #" + toString(partition_id) + " handle " + toString(handle_id) + - " not in the regions of partition"); - } - } - } - input->readSuffix(); - } - - output("sum(partitions rows): " + toString(partitions_rows)); -} - -void dbgFuncCheckRegionCorrect(Context & context, const ASTs & args, DBGInvoker::Printer output) -{ - if (args.size() != 2) - throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS); - - const String & pk_name = MutableSupport::tidb_pk_column_name; - - const String & database_name = typeid_cast(*args[0]).name; - const String & table_name = typeid_cast(*args[1]).name; - - TableID table_id = getTableID(context, database_name, table_name); - const auto partition_number = context.getMergeTreeSettings().mutable_mergetree_partition_number; - - // TODO: usa a struct - std::vector> handle_ranges; - auto & kvstore = context.getTMTContext().kvstore; - kvstore->traverseRegions([&](Region * region) - { - auto [start_key, end_key] = region->getRange(); - HandleID start_handle = TiKVRange::getRangeHandle(start_key, table_id); - HandleID end_handle = TiKVRange::getRangeHandle(end_key, table_id); - // TODO: use plain ptr? - handle_ranges.push_back({start_handle, end_handle, reinterpret_cast(region)}); - }); - - sort(handle_ranges.begin(), handle_ranges.end()); - - std::vector> handle_ranges_check; - for (int i = 0; i < (int) handle_ranges.size(); ++i) - { - handle_ranges_check.push_back({std::get<0>(handle_ranges[i]), i}); - handle_ranges_check.push_back({std::get<1>(handle_ranges[i]), i}); - } - sort(handle_ranges_check.begin(), handle_ranges_check.end()); - - TMTContext & tmt = context.getTMTContext(); - size_t partitions_rows = 0; - - for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) - { - std::unordered_map partition_regions; - tmt.region_partition.traverseRegionsByTablePartition(table_id, partition_id, [&](Regions regions) - { - for (auto region : regions) - partition_regions[region->id()] = region; - }); - - std::stringstream query; - query << "SELECT " << pk_name << " FROM " << database_name << "." << table_name << - " PARTITION ('" << partition_id << "')"; - Context query_context = context; - query_context.setSessionContext(query_context); - BlockInputStreamPtr input = executeQuery(query.str(), query_context, true, QueryProcessingStage::Complete).in; - - while (true) - { - Block block = input->read(); - if (!block || block.rows() == 0) - break; - partitions_rows += block.rows(); - - auto col = block.begin()->column; - for (size_t i = 0, n = col->size(); i< n; ++i) - { - Field field = (*col)[i]; - auto handle_id = field.get(); - auto it = std::upper_bound(handle_ranges_check.begin(), handle_ranges_check.end(), - std::make_pair(handle_id, std::numeric_limits::max())); - bool is_handle_not_found = false; - int idx; - if (it == handle_ranges_check.end()) - { - is_handle_not_found = true; - } - else - { - idx = it->second; - if (!(std::get<0>(handle_ranges[idx]) <= handle_id && std::get<1>(handle_ranges[idx]) > handle_id)) - is_handle_not_found = true; - } - - if (is_handle_not_found) - { - output("partition #" + toString(partition_id) + " handle " + toString(handle_id) + " not in the regions of partition"); - continue; - } - - Region * region = reinterpret_cast(std::get<2>(handle_ranges[idx])); - auto partition_regions_it = partition_regions.find(region->id()); - if (partition_regions_it == partition_regions.end()) - { - output("region not found in kvstore. region info: " + region->toString()); - } - else - { - if ((*partition_regions_it).second.get() != region) - output("region not match. region info:" + region->toString()); - } - } - } - } - - output("sum(patitions rows): " + toString(partitions_rows)); -} - } diff --git a/dbms/src/Debug/dbgFuncRegion.h b/dbms/src/Debug/dbgFuncRegion.h index 079dce31ddc..20cedb194a9 100644 --- a/dbms/src/Debug/dbgFuncRegion.h +++ b/dbms/src/Debug/dbgFuncRegion.h @@ -23,26 +23,11 @@ void dbgFuncRegionSnapshot(Context & context, const ASTs & args, DBGInvoker::Pri // Dump region-partition relationship // Usage: // ./storage-client.sh "DBGInvoke dump_region_partition()" -void dbgFuncRegionPartition(Context & context, const ASTs & args, DBGInvoker::Printer output); +void dbgFuncDumpRegion(Context& context, const ASTs& args, DBGInvoker::Printer output); // Remove region's data from partition // Usage: // ./storage-client.sh "DBGInvoke rm_region_data(region_id)" void dbgFuncRegionRmData(Context & context, const ASTs & args, DBGInvoker::Printer output); -// Check each partition: rows == sum(regions rows) -// Usage: -// ./storage-client.sh "DBGInvoke check_partition_region(database_name, table_name)" -void dbgFuncCheckPartitionRegionRows(Context & context, const ASTs & args, DBGInvoker::Printer output); - -// Scan rows not in the correct partitions -// Usage: -// ./storage-client.sh "DBGInvoke scan_partition_extra_rows(database_name, table_name)" -void dbgFuncScanPartitionExtraRows(Context & context, const ASTs & args, DBGInvoker::Printer output); - -// check region correct -// Usage: -// ./storage-client.sh "DBGInvoke check_region_correct(database_name, table_name)" -void dbgFuncCheckRegionCorrect(Context & context, const ASTs & args, DBGInvoker::Printer output); - } diff --git a/dbms/src/Debug/dbgTools.cpp b/dbms/src/Debug/dbgTools.cpp index d86b21b951c..f3db5a4e9cb 100644 --- a/dbms/src/Debug/dbgTools.cpp +++ b/dbms/src/Debug/dbgTools.cpp @@ -268,6 +268,7 @@ void batchInsert(const TiDB::TableInfo & table_info, std::unique_ptr } tmt.kvstore->onServiceCommand(cmds, raft_ctx); + tmt.region_table.tryFlushRegions(); } } @@ -279,9 +280,9 @@ void concurrentBatchInsert(const TiDB::TableInfo & table_info, Int64 concurrent_ RegionID curr_max_region_id(InvalidRegionID); HandleID curr_max_handle_id = 0; tmt.kvstore->traverseRegions( - [&](Region * region) { - curr_max_region_id = (curr_max_region_id == InvalidRegionID) ? region->id() : - std::max(curr_max_region_id, region->id()); + [&](const RegionID region_id, const RegionPtr & region) { + curr_max_region_id = (curr_max_region_id == InvalidRegionID) ? region_id : + std::max(curr_max_region_id, region_id); auto range = region->getRange(); curr_max_handle_id = std::max(RecordKVFormat::getHandle(range.second), curr_max_handle_id); }); @@ -318,7 +319,7 @@ Int64 concurrentRangeOperate(const TiDB::TableInfo & table_info, HandleID start_ for (UInt64 partition_id = 0; partition_id < partition_number; ++partition_id) { TMTContext & tmt = context.getTMTContext(); - tmt.region_partition.traverseRegionsByTablePartition(table_info.id, partition_id, [&](Regions d){ + tmt.region_table.traverseRegionsByTable(table_info.id, [&](Regions d) { regions.insert(regions.end(), d.begin(), d.end()); }); } diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index 4810ea34c92..0da6a0e058f 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -36,14 +36,14 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context, BackgroundProcessingPool::TaskHandle persist_handle; BackgroundProcessingPool::TaskHandle flush_handle; - RegionPartition & region_partition = db_context.getTMTContext().region_partition; + RegionTable & region_table = db_context.getTMTContext().region_table; try { kvstore->report(rctx); persist_handle = background_pool.addTask([&, this] { return kvstore->tryPersistAndReport(rctx); }); - flush_handle = background_pool.addTask([&] { return region_partition.tryFlushRegions(); }); + flush_handle = background_pool.addTask([&] { return region_table.tryFlushRegions(); }); enginepb::CommandRequestBatch request; while (stream->Read(&request)) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 68b8118d0c8..d76bc4b49d4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -38,6 +38,8 @@ namespace std #include #include #include +#include +#include #include #include @@ -143,7 +145,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows)); } -void extend_mutable_engine_column_names(Names& column_names_to_read, const MergeTreeData& data) +void extend_mutable_engine_column_names(Names& column_names_to_read, const MergeTreeData & data) { column_names_to_read.insert(column_names_to_read.end(), MutableSupport::version_column_name); column_names_to_read.insert(column_names_to_read.end(), MutableSupport::delmark_column_name); @@ -155,6 +157,61 @@ void extend_mutable_engine_column_names(Names& column_names_to_read, const Merge column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); } +MarkRanges MarkRangesFromRegionRange(const MergeTreeData::DataPart::Index & index, const Int64 handle_begin, + const Int64 handle_end, MarkRanges mark_ranges, size_t min_marks_for_seek, const Settings & settings) +{ + MarkRanges res; + + size_t marks_count = index.at(0)->size(); + if (marks_count == 0) + return res; + + reverse(mark_ranges.begin(), mark_ranges.end()); + MarkRanges ranges_stack = std::move(mark_ranges); + + Field index_left; + Field index_right; + + while (!ranges_stack.empty()) + { + MarkRange range = ranges_stack.back(); + ranges_stack.pop_back(); + + index[0]->get(range.begin, index_left); + + if (range.end != marks_count) + index[0]->get(range.end, index_right); + else + index_right = Field(std::numeric_limits::max()); + + Int64 index_left_handle = index_left.get(); + Int64 index_right_handle = index_right.get(); + + if (handle_begin >= index_right_handle || index_left_handle >= handle_end) + continue; + + if (range.end == range.begin + 1) + { + if (res.empty() || range.begin - res.back().end > min_marks_for_seek) + res.push_back(range); + else + res.back().end = range.end; + } + else + { + size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1; + size_t end; + + for (end = range.end; end > range.begin + step; end -= step) + ranges_stack.push_back(MarkRange(end - step, end)); + + ranges_stack.push_back(MarkRange(range.begin, end)); + } + } + + return res; +} + BlockInputStreams MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, const SelectQueryInfo & query_info, @@ -542,17 +599,33 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( /// @todo Make sure partition select works properly when sampling is used! - NameSet specific_partitions; - bool use_specific_partitions = false; - if (const auto * select = typeid_cast(query_info.query.get())) + // TODO: set regions_query_info from setting. + + std::vector regions_query_info; + std::vector regions_query_res; + BlockInputStreams region_block_data; + String handle_col_name; + + if (is_txn_engine) { - if (select->partition_expression_list) - { - specific_partitions = data.getPartitionIDsInLiteral(select->partition_expression_list, context); - use_specific_partitions = true; - } + handle_col_name = data.primary_expr_ast->children[0]->getColumnName(); + + TMTContext & tmt = context.getTMTContext(); + + tmt.region_table.traverseRegionsByTable(data.table_info.id, [&](Regions regions){ + for (const auto & region : regions) + { + regions_query_info.push_back({region->id(), region->version(), region->getHandleRangeByTable(data.table_info.id)}); + } + }); } + std::sort(regions_query_info.begin(), regions_query_info.end()); + size_t region_cnt = regions_query_info.size(); + std::vector region_range_parts(region_cnt); + regions_query_res.assign(region_cnt, true); + region_block_data.assign(region_cnt, nullptr); + RangesInDataParts parts_with_ranges; /// Let's find what range to read from each part. @@ -568,8 +641,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}}; /// Make sure this part is in mark range and contained by valid partitions. - if (!ranges.ranges.empty() - && (specific_partitions.empty() || specific_partitions.find(part->partition.getID(data)) != specific_partitions.end())) + if (!ranges.ranges.empty()) { parts_with_ranges.push_back(ranges); @@ -579,8 +651,72 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } } - LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " - << sum_marks << " marks to read from " << sum_ranges << " ranges"); + if (!is_txn_engine || select.raw_for_mutable) + LOG_DEBUG(log, "Selected " << parts.size() << " parts, " << parts_with_ranges.size() << " parts by key, " + << sum_marks << " marks to read from " << sum_ranges << " ranges"); + else + { + TMTContext & tmt = context.getTMTContext(); + + extend_mutable_engine_column_names(column_names_to_read, data); + + // get data block from region first. + std::vector rows_in_mem(region_cnt, 0); + + for (size_t region_index = 0; region_index < region_cnt; ++region_index) + { + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + + auto [region_input_stream, status, tol] = tmt.region_table.getBlockInputStreamByRegion( + data.table_info.id, region_query_info.region_id, region_query_info.version, + data.table_info, data.getColumns(), column_names_to_read, + true, query_info.resolve_locks, query_info.read_tso); + if (status != RegionTable::OK) + { + regions_query_res[region_index] = false; + LOG_INFO(log, "Region " << region_query_info.region_id << ", version " << regions_query_info[region_index].version + << ", handle range [" << regions_query_info[region_index].range_in_table.first + << ", " << regions_query_info[region_index].range_in_table.second << ") , status " + << RegionTable::RegionReadStatusString(status)); + continue; + } + region_block_data[region_index] = region_input_stream; + rows_in_mem[region_index] = tol; + } + + for (size_t region_index = 0; region_index < region_cnt; ++region_index) + { + if (!regions_query_res[region_index]) + continue; + + size_t sum_marks = 0; + size_t sum_ranges = 0; + for (const RangesInDataPart & ranges : parts_with_ranges) + { + MarkRanges mark_ranges = MarkRangesFromRegionRange(ranges.data_part->index, + regions_query_info[region_index].range_in_table.first, + regions_query_info[region_index].range_in_table.second, + ranges.ranges, + (settings.merge_tree_min_rows_for_seek + + data.index_granularity - 1) + / data.index_granularity, + settings); + if (mark_ranges.empty()) + continue; + region_range_parts[region_index].push_back({ranges.data_part, ranges.part_index_in_query, mark_ranges}); + sum_ranges += mark_ranges.size(); + for (const auto & range : mark_ranges) + sum_marks += range.end - range.begin; + } + + LOG_TRACE(log, "Region " << regions_query_info[region_index].region_id << ", version " + << regions_query_info[region_index].version + << ", handle range [" << regions_query_info[region_index].range_in_table.first + << ", " << regions_query_info[region_index].range_in_table.second << "), selected " + << region_range_parts[region_index].size() << " parts, " << sum_marks << " marks to read from " + << sum_ranges << " ranges, read " << rows_in_mem[region_index] << " rows from memory"); + } + } if (parts_with_ranges.empty() && !is_txn_engine) return {}; @@ -591,6 +727,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( BlockInputStreams res; + // TODO: By now, selraw will output all parts and all regions. Fix it if needed. if (is_txn_engine) { bool use_uncompressed_cache = settings.use_uncompressed_cache; @@ -614,15 +751,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (!select.no_kvstore) { - TMTContext &tmt = context.getTMTContext(); - for (UInt64 partition_id = 0; partition_id < data.settings.mutable_mergetree_partition_number; - ++partition_id) { - if (use_specific_partitions - && specific_partitions.find(toString(partition_id)) == specific_partitions.end()) + for (size_t region_index = 0; region_index < region_cnt; ++region_index) + { + if (!regions_query_res[region_index]) continue; - auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition( - data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read, - false, true, query_info.resolve_locks, query_info.read_tso); + auto region_input_stream = region_block_data[region_index]; if (region_input_stream) res.emplace_back(region_input_stream); } @@ -630,26 +763,23 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } else { - extend_mutable_engine_column_names(column_names_to_read, data); + std::vector> region_streams(num_streams); - std::unordered_map partitions; - for (size_t part_index = 0; part_index < parts_with_ranges.size(); ++part_index) - { - const RangesInDataPart & part = parts_with_ranges[part_index]; - // TODO: use part.data_part->info.partition_id? - UInt64 partition_id = part.data_part->partition.value[0].get(); - partitions[partition_id].emplace_back(part); - } + for (size_t region_index = 0; region_index < region_cnt; ++region_index) + region_streams[region_index % num_streams].push_back(region_index); - TMTContext & tmt = context.getTMTContext(); - for (UInt64 partition_id = 0; partition_id < data.settings.mutable_mergetree_partition_number; ++partition_id) + for (const auto & region_idx_list : region_streams) { - BlockInputStreams merging; - const auto partition_it = partitions.find(partition_id); - if (partition_it != partitions.cend()) + BlockInputStreams union_regions_stream; + for (size_t region_index : region_idx_list) { - auto parts = partition_it->second; - for (const auto & part : parts) + if (!regions_query_res[region_index]) + continue; + + const RegionQueryInfo & region_query_info = regions_query_info[region_index]; + + BlockInputStreams merging; + for (const RangesInDataPart & part : region_range_parts[region_index]) { BlockInputStreamPtr source_stream = std::make_shared( data, part.data_part, max_block_size, settings.preferred_block_size_bytes, @@ -657,37 +787,36 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( use_uncompressed_cache, prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true, true, virt_column_names, part.part_index_in_query); - source_stream = std::make_shared(source_stream, - data.getPrimaryExpression()); + source_stream = std::make_shared(source_stream, region_query_info.range_in_table, handle_col_name); - BlockInputStreamPtr version_filtered_stream = std::make_shared( + source_stream = std::make_shared( source_stream, MutableSupport::version_column_name, query_info.read_tso); - merging.emplace_back(version_filtered_stream); - } - } - if (!select.no_kvstore && (!use_specific_partitions || - specific_partitions.find(toString(partition_id)) != specific_partitions.end())) - { - auto region_input_stream = tmt.region_partition.getBlockInputStreamByPartition( - data.table_info.id, partition_id, data.table_info, data.getColumns(), column_names_to_read, - false, true, query_info.resolve_locks, query_info.read_tso); + source_stream = std::make_shared(source_stream, + data.getPrimaryExpression()); + + merging.emplace_back(source_stream); + } + auto region_input_stream = region_block_data[region_index]; if (region_input_stream) { - BlockInputStreamPtr version_filtered_stream = std::make_shared( + region_input_stream = std::make_shared( region_input_stream, MutableSupport::version_column_name, query_info.read_tso); - merging.emplace_back(version_filtered_stream); - } - } - if (merging.size()) - { - res.emplace_back(std::make_shared( - merging, data.getPrimarySortDescription(), - MutableSupport::version_column_name, MutableSupport::delmark_column_name, - DEFAULT_MERGE_BLOCK_SIZE, - query_info.read_tso)); + region_input_stream = std::make_shared(region_input_stream, + data.getPrimaryExpression()); + merging.emplace_back(region_input_stream); + } + if (merging.size()) + union_regions_stream.emplace_back( + std::make_shared( + merging, data.getPrimarySortDescription(), + MutableSupport::version_column_name, MutableSupport::delmark_column_name, + DEFAULT_MERGE_BLOCK_SIZE, + query_info.read_tso)); } + if (union_regions_stream.size()) + res.emplace_back(std::make_shared>(union_regions_stream, nullptr, 1)); } } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 279f94d3698..cc583756040 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -106,8 +106,10 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block PODArray partition_num_to_first_row; PODArray partition_num_to_key; IColumn::Selector selector; - size_t partition_mod = (data.merging_params.mode != MergeTreeData::MergingParams::Mutable && data.merging_params.mode != MergeTreeData::MergingParams::Txn) - ? 1 : size_t(data.settings.mutable_mergetree_partition_number); + + size_t partition_mod = 1; + if (data.merging_params.mode == MergeTreeData::MergingParams::Mutable) + partition_mod = size_t(data.settings.mutable_mergetree_partition_number); buildScatterSelector(partition_columns, partition_mod, partition_num_to_first_row, partition_num_to_key, selector); size_t partitions_count = partition_num_to_first_row.size(); diff --git a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h index 98e87814c93..8b887a6442c 100644 --- a/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/TxnMergeTreeBlockOutputStream.h @@ -13,7 +13,7 @@ class StorageMergeTree; class TxnMergeTreeBlockOutputStream : public IBlockOutputStream { public: - TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_) : + TxnMergeTreeBlockOutputStream(StorageMergeTree & storage_, UInt64 partition_id_ = 0) : storage(storage_), log(&Logger::get("TxnMergeTreeBlockOutputStream")), partition_id(partition_id_) { } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 3422003509a..62a6e0d23fe 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -182,23 +182,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Setting if (data.merging_params.mode == MergeTreeData::MergingParams::Txn) { - NameSet partitions; - if (insert_query && insert_query->partition_expression_list) - partitions = data.getPartitionIDsInLiteral(insert_query->partition_expression_list, context); - else if (delete_query && delete_query->partition_expression_list) - partitions = data.getPartitionIDsInLiteral(delete_query->partition_expression_list, context); - if (partitions.size() > 1) - throw Exception("INSERT into a TMT table should only specify one partition.", ErrorCodes::BAD_ARGUMENTS); - - if (partitions.size() == 1) - { - UInt64 partition_id = parse(*(partitions.begin())); - res = std::make_shared(*this, partition_id); - } - else if (delete_query) - { - throw Exception("DELETE from a TMT table should specify one partition.", ErrorCodes::BAD_ARGUMENTS); - } + res = std::make_shared(*this); if (insert_query) { diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index bd4bc5f900c..756f679c132 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -10,27 +10,20 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -// TODO move to Settings.h -static Seconds REGION_PERSIST_PERIOD(60); // 1 minutes -static Seconds KVSTORE_TRY_PERSIST_PERIOD(10); // 10 seconds - -KVStore::KVStore(const std::string & data_dir, Context * context) : region_persister(data_dir), log(&Logger::get("KVStore")) +KVStore::KVStore(const std::string & data_dir, Context *, std::vector * regions_to_remove) : region_persister(data_dir), log(&Logger::get("KVStore")) { std::lock_guard lock(mutex); region_persister.restore(regions); // Remove regions which pending_remove = true, those regions still exist because progress crash after persisted and before removal. - std::vector to_remove; - for (auto p : regions) - { - RegionPtr & region = p.second; - if (region->isPendingRemove()) - to_remove.push_back(region->id()); - } - for (auto & region_id : to_remove) + if (regions_to_remove != nullptr) { - LOG_INFO(log, "Region [" << region_id << "] is removed after restored."); - removeRegion(region_id, context); + for (auto & p : regions) + { + RegionPtr & region = p.second; + if (region->isPendingRemove()) + regions_to_remove->push_back(region->id()); + } } } @@ -41,11 +34,17 @@ RegionPtr KVStore::getRegion(RegionID region_id) return (it == regions.end()) ? nullptr : it->second; } -void KVStore::traverseRegions(std::function callback) +const RegionMap & KVStore::getRegions() +{ + std::lock_guard lock(mutex); + return regions; +} + +void KVStore::traverseRegions(std::function callback) { std::lock_guard lock(mutex); for (auto it = regions.begin(); it != regions.end(); ++it) - callback(it->second.get()); + callback(it->first, it->second); } void KVStore::onSnapshot(const RegionPtr & region, Context * context) @@ -53,9 +52,6 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context) TMTContext * tmt_ctx = (bool)(context) ? &(context->getTMTContext()) : nullptr; auto region_id = region->id(); - // Remove old region data in partition before apply snapshot. - // TODO support atomic remove & insert new region. - RegionPtr old_region; { std::lock_guard lock(mutex); @@ -64,8 +60,8 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context) old_region = it->second; } - if (tmt_ctx && old_region) - tmt_ctx->region_partition.removeRegion(old_region); + if (old_region != nullptr && old_region->getIndex() >= region->getIndex()) + return; region_persister.persist(region); @@ -75,7 +71,7 @@ void KVStore::onSnapshot(const RegionPtr & region, Context * context) } if (tmt_ctx) - tmt_ctx->region_partition.applySnapshotRegion(region); + tmt_ctx->region_table.applySnapshotRegion(region); } void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftContext & raft_ctx) @@ -108,18 +104,6 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC curr_region = it->second; } - if (curr_region->isPendingRemove()) - { - // Normally this situation should not exist. Unless some exceptions throw during former removeRegion. - LOG_DEBUG(log, curr_region->toString() << " (before cmd) is in pending remove status, remove it now."); - removeRegion(curr_region_id, context); - - LOG_INFO(log, "Sync status because of removal: " << curr_region->toString(true)); - *(responseBatch.mutable_responses()->Add()) = curr_region->toCommandResponse(); - - continue; - } - if (header.destroy()) { LOG_INFO(log, curr_region->toString() << " is removed by tombstone."); @@ -134,9 +118,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC continue; } - auto before_cache_bytes = curr_region->dataSize(); - - auto [new_region, split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback); + auto [split_regions, table_ids, sync] = curr_region->onCommand(cmd, callback); if (curr_region->isPendingRemove()) { @@ -149,16 +131,11 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC continue; } - if (!split_regions.empty()) { - // Persist current region and split regions, and mange data in partition - // Add to regions map so that queries can see them. - // TODO: support atomic or idempotent operation. { std::lock_guard lock(mutex); - curr_region->swap(*new_region); for (const auto & region : split_regions) { auto [it, ok] = regions.emplace(region->id(), region); @@ -170,17 +147,19 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC } } - if (tmt_ctx) - tmt_ctx->region_partition.splitRegion(curr_region, split_regions); + { + region_persister.persist(curr_region); + for (const auto & region : split_regions) + region_persister.persist(region); + } - region_persister.persist(curr_region); - for (const auto & region : split_regions) - region_persister.persist(region); + if (tmt_ctx) + tmt_ctx->region_table.splitRegion(curr_region, split_regions); } else { if (tmt_ctx) - tmt_ctx->region_partition.updateRegion(curr_region, before_cache_bytes, table_ids); + tmt_ctx->region_table.updateRegion(curr_region, table_ids); if (sync) region_persister.persist(curr_region); @@ -191,7 +170,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC LOG_INFO(log, "Sync status: " << curr_region->toString(true)); *(responseBatch.mutable_responses()->Add()) = curr_region->toCommandResponse(); - for (auto & region : split_regions) + for (const auto & region : split_regions) *(responseBatch.mutable_responses()->Add()) = region->toCommandResponse(); } } @@ -213,35 +192,42 @@ void KVStore::report(RaftContext & raft_ctx) raft_ctx.send(responseBatch); } -bool KVStore::tryPersistAndReport(RaftContext & context) +bool KVStore::tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period, const Seconds region_persist_period) { - std::lock_guard lock(mutex); - Timepoint now = Clock::now(); - if (now < (last_try_persist_time + KVSTORE_TRY_PERSIST_PERIOD)) + if (now < (last_try_persist_time.load() + kvstore_try_persist_period)) return false; last_try_persist_time = now; bool persist_job = false; enginepb::CommandResponseBatch responseBatch; - for (const auto & p : regions) - { - const auto region = p.second; - if (now < (region->lastPersistTime() + REGION_PERSIST_PERIOD)) - continue; + RegionMap all_region_copy; + traverseRegions([&](const RegionID region_id, const RegionPtr & region) { + if (now < (region->lastPersistTime() + region_persist_period)) + return; + if (region->persistParm() == 0) + return; + all_region_copy[region_id] = region; + }); + + std::stringstream ss; + + for (auto && [region_id, region] : all_region_copy) + { persist_job = true; + region_persister.persist(region); - region->markPersisted(); - LOG_TRACE(log, "Region " << region->id() << " report status"); + ss << "(" << region_id << "," << region->persistParm() << ") "; *(responseBatch.mutable_responses()->Add()) = region->toCommandResponse(); } if (persist_job) { - LOG_INFO(log, "Batch report regions status"); + LOG_TRACE(log, "Regions " << ss.str() << "report status"); + LOG_TRACE(log, "Batch report regions status"); context.send(responseBatch); } @@ -262,7 +248,7 @@ void KVStore::removeRegion(RegionID region_id, Context * context) region_persister.drop(region_id); if (context) - context->getTMTContext().region_partition.removeRegion(region); + context->getTMTContext().region_table.removeRegion(region); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 553dfa056ed..217a8fa9120 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -17,13 +17,17 @@ namespace DB { +// TODO move to Settings.h +static const Seconds REGION_PERSIST_PERIOD(120); // 2 minutes +static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds + /// TODO: brief design document. class KVStore final : private boost::noncopyable { public: - KVStore(const std::string & data_dir, Context * context = nullptr); + KVStore(const std::string & data_dir, Context * context = nullptr, std::vector * regions_to_remove = nullptr); RegionPtr getRegion(RegionID region_id); - void traverseRegions(std::function callback); + void traverseRegions(std::function callback); void onSnapshot(const RegionPtr & region, Context * context); // TODO: remove RaftContext and use Context + CommandServerReaderWriter @@ -34,12 +38,11 @@ class KVStore final : private boost::noncopyable // Persist and report those expired regions. // Currently we also trigger region files GC in it. - bool tryPersistAndReport(RaftContext & context); + bool tryPersistAndReport(RaftContext & context, const Seconds kvstore_try_persist_period=KVSTORE_TRY_PERSIST_PERIOD, + const Seconds region_persist_period=REGION_PERSIST_PERIOD); - // For test, please do NOT remove. - RegionMap & _regions() { return regions; } + const RegionMap & getRegions(); -private: void removeRegion(RegionID region_id, Context * context); private: @@ -49,7 +52,7 @@ class KVStore final : private boost::noncopyable std::mutex mutex; Consistency consistency; - Timepoint last_try_persist_time = Clock::now(); + std::atomic last_try_persist_time = Clock::now(); Logger * log; }; diff --git a/dbms/src/Storages/Transaction/PartitionDataMover.cpp b/dbms/src/Storages/Transaction/PartitionDataMover.cpp index ad391b722ee..48efa08446a 100644 --- a/dbms/src/Storages/Transaction/PartitionDataMover.cpp +++ b/dbms/src/Storages/Transaction/PartitionDataMover.cpp @@ -19,8 +19,8 @@ namespace ErrorCodes namespace PartitionDataMover { -BlockInputStreamPtr createBlockInputStreamFromRangeInPartition(const Context & context, StorageMergeTree * storage, - UInt64 partition_id, const Field & begin, const Field & excluded_end) +BlockInputStreamPtr createBlockInputStreamFromRange(const Context& context, StorageMergeTree* storage, + const HandleID begin, const HandleID excluded_end) { SortDescription pk_columns = storage->getData().getPrimarySortDescription(); // TODO: make sure PKs are all uint64 @@ -34,13 +34,12 @@ BlockInputStreamPtr createBlockInputStreamFromRangeInPartition(const Context & c throw Exception("PartitionDataMover: primary key should be Int64", ErrorCodes::LOGICAL_ERROR); std::stringstream ss; - ss << "SELRAW NOKVSTORE * FROM " << storage->getDatabaseName() << "." << storage->getTableName() << - " PARTITION ('" << partition_id << "') WHERE (" << - applyVisitor(FieldVisitorToString(), begin) << " <= " << pk_name << ") AND (" << - pk_name << " < " + applyVisitor(FieldVisitorToString(), excluded_end) << ")"; + ss << "SELRAW NOKVSTORE * FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " WHERE (" << + begin << " <= " << pk_name << ") AND (" << + pk_name << " < " << excluded_end << ")"; std::string query = ss.str(); - LOG_DEBUG(&Logger::get("PartitionDataMover"), "createBlockInputStreamFromRangeInPartition sql: " << query); + LOG_DEBUG(&Logger::get("PartitionDataMover"), "createBlockInputStreamFromRange sql: " << query); Context query_context = context; query_context.setSessionContext(query_context); @@ -110,25 +109,14 @@ void increaseVersionInBlock(Block & block, size_t increasement = 1) } // namespace PartitionDataMover -std::pair getRegionRangeField(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id) -{ - // Example: - // Range: [100_10, 200_5), table_id: 100, then start_handle: 10, end_handle: MAX_HANDLE_ID - - HandleID start_handle = TiKVRange::getRangeHandle(start_key, table_id); - HandleID end_handle = TiKVRange::getRangeHandle(end_key, table_id); - - return {Field(start_handle), Field(end_handle)}; -} - -void deleteRangeInPartition(const Context & context, StorageMergeTree * storage, - UInt64 partition_id, const Field & begin, const Field & excluded_end) +void deleteRange(const Context& context, StorageMergeTree* storage, + const HandleID begin, const HandleID excluded_end) { auto table_lock = storage->lockStructure(true, __PRETTY_FUNCTION__); - BlockInputStreamPtr input = PartitionDataMover::createBlockInputStreamFromRangeInPartition( - context, storage, partition_id, begin, excluded_end); - TxnMergeTreeBlockOutputStream output(*storage, partition_id); + BlockInputStreamPtr input = PartitionDataMover::createBlockInputStreamFromRange( + context, storage, begin, excluded_end); + TxnMergeTreeBlockOutputStream output(*storage); output.writePrefix(); @@ -144,6 +132,7 @@ void deleteRangeInPartition(const Context & context, StorageMergeTree * storage, output.writeSuffix(); } +/* // TODO: use `new_ver = old_ver+1` to delete data is not a good way, may conflict with data in the future void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage, UInt64 src_partition_id, UInt64 dest_partition_id, const Field & begin, const Field & excluded_end) @@ -186,5 +175,6 @@ void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * stor dest_partition_id << ", range: [" << applyVisitor(FieldVisitorToString(), begin) << ", " << applyVisitor(FieldVisitorToString(), excluded_end) << ")"); } +*/ } diff --git a/dbms/src/Storages/Transaction/PartitionDataMover.h b/dbms/src/Storages/Transaction/PartitionDataMover.h index b6f4d8c00ab..8020798698a 100644 --- a/dbms/src/Storages/Transaction/PartitionDataMover.h +++ b/dbms/src/Storages/Transaction/PartitionDataMover.h @@ -9,16 +9,16 @@ namespace DB { -std::pair getRegionRangeField(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id); /// Remove range from this partition. /// Note that [begin, excluded_end) is not necessarily to locate in the range of this partition (or table). -void deleteRangeInPartition(const Context & context, StorageMergeTree * storage, - UInt64 partition_id, const Field & begin, const Field & excluded_end); +void deleteRange(const Context& context, StorageMergeTree* storage, + const HandleID begin, const HandleID excluded_end); +/* /// Move data in [begin, excluded_end) from src_partition_id to dest_partition_id. /// FIXME/TODO: currently this function is not atomic and need to fix. void moveRangeBetweenPartitions(const Context & context, StorageMergeTree * storage, UInt64 src_partition_id, UInt64 dest_partition_id, const Field & begin, const Field & excluded_end); - +*/ } diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 0edd13f4514..91d0c9ddaa1 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include @@ -12,73 +12,66 @@ namespace DB { -BlockInputStreamPtr RegionPartition::getBlockInputStreamByPartition(TableID table_id, UInt64 partition_id, - const TiDB::TableInfo & table_info, const ColumnsDescription & columns, const Names & ordered_columns, - bool remove_on_read, bool learner_read, bool resolve_locks, UInt64 start_ts) +std::tuple RegionTable::getBlockInputStreamByRegion( + TableID table_id, + const RegionID region_id, + const RegionVersion region_version, + const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, + const Names & ordered_columns, + bool learner_read, + bool resolve_locks, + UInt64 start_ts, + std::vector * keys) { auto & kvstore = context.getTMTContext().kvstore; - Regions partition_regions; - { - std::lock_guard lock(mutex); - - auto & table = getOrCreateTable(table_id); - auto & partition = table.partitions.get()[partition_id]; - - for (auto & region_id : partition.region_ids) - { - auto region = kvstore->getRegion(region_id); - if (!region) - throw Exception("Region " + DB::toString(region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); - partition_regions.push_back(region); - } - } - if (partition_regions.empty()) - return {}; + auto region = kvstore->getRegion(region_id); + if (!region) + return {nullptr, NOT_FOUND, 0}; if (learner_read) - { - for (const auto & region : partition_regions) - { - region->wait_index(region->learner_read()); - } - } + region->waitIndex(region->learnerRead()); auto schema_fetcher = [&](TableID) { return std::make_tuple(&table_info, &columns, &ordered_columns); }; - RegionBlockReader reader(schema_fetcher, std::move(partition_regions), table_id, remove_on_read, resolve_locks, start_ts); - - BlocksList blocks; - Block block; - TableID current_table_id; - RegionID current_region_id; - Region::LockInfoPtr lock_info; - Region::LockInfos lock_infos; - while (true) { - std::tie(block, current_table_id, current_region_id, lock_info) = reader.next(); - if (lock_info) + auto scanner = region->createCommittedScanRemover(table_id); + + if (region->isPendingRemove()) + return {nullptr, PENDING_REMOVE, 0}; + + if (region_version != InvalidRegionVersion && region->version() != region_version) + return {nullptr, VERSION_ERROR, 0}; + { - lock_infos.emplace_back(std::move(lock_info)); - continue; + Region::LockInfoPtr lock_info = nullptr; + if (resolve_locks) + lock_info = scanner->getLockInfo(table_id, start_ts); + if (lock_info) + { + Region::LockInfos lock_infos; + lock_infos.emplace_back(std::move(lock_info)); + throw LockException(std::move(lock_infos)); + } } - if (!block || block.rows() == 0) - break; - if (table_id != current_table_id) - throw Exception("RegionPartition::getBlockInputStreamByPartition", ErrorCodes::LOGICAL_ERROR); - blocks.push_back(block); - } - if (!lock_infos.empty()) - throw LockException(std::move(lock_infos)); + auto next_table_id = scanner->hasNext(); + if (next_table_id == InvalidTableID) + return {nullptr, OK, 0}; - if (blocks.empty()) - return {}; + const auto [table_info, columns, ordered_columns] = schema_fetcher(next_table_id); + auto block = RegionBlockRead(*table_info, *columns, *ordered_columns, scanner, keys); - return std::make_shared(std::move(blocks)); -} + size_t tol = block.rows(); + + BlocksList blocks; + blocks.emplace_back(std::move(block)); + return {std::make_shared(std::move(blocks)), OK, tol}; + } +} -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index fa40d1d2a97..069d0cb9edc 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -46,11 +46,14 @@ Region::KVMap::iterator Region::removeDataByWriteIt(const KVMap::iterator & writ return write_cf.erase(write_it); } -Region::ReadInfo Region::readDataByWriteIt(const KVMap::iterator & write_it) +Region::ReadInfo Region::readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys) { auto & write_key = write_it->first; auto & write_value = write_it->second; + if (keys) + (*keys).push_back(write_key); + auto [write_type, prewrite_ts, short_value] = RecordKVFormat::decodeWriteCfValue(write_value); String decode_key = std::get<0>(RecordKVFormat::decodeTiKVKey(write_key)); @@ -103,6 +106,21 @@ TableID Region::insert(const std::string & cf, const TiKVKey & key, const TiKVVa return doInsert(cf, key, value); } +void Region::batchInsert(std::function f) +{ + std::lock_guard lock(mutex); + for (;;) + { + if (BatchInsertNode p; f(p)) + { + auto && [k, v, cf] = p; + doInsert(*cf, *k, *v); + } + else + break; + } +} + TableID Region::doInsert(const std::string & cf, const TiKVKey & key, const TiKVValue & value) { // Ignoring all keys other than records. @@ -160,52 +178,63 @@ TableID Region::doRemove(const std::string & cf, const TiKVKey & key) return table_id; } -UInt64 Region::getIndex() { return meta.appliedIndex(); } +UInt64 Region::getIndex() const { return meta.appliedIndex(); } -RegionPtr Region::splitInto(const RegionMeta & meta) const +RegionPtr Region::splitInto(const RegionMeta & meta) { auto [start_key, end_key] = meta.getRange(); RegionPtr new_region; if (client != nullptr) - { new_region = std::make_shared( meta, std::make_shared(client->cache, client->client, meta.getRegionVerID())); - } else - { new_region = std::make_shared(meta); - } - for (auto && [key, value] : data_cf) + std::lock_guard lock(mutex); + + for (auto it = data_cf.begin(); it != data_cf.end(); ) { - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); + bool ok = start_key ? it->first >= start_key : true; + ok = ok && (end_key ? it->first < end_key : true); if (ok) { - new_region->data_cf.emplace(key, value); - new_region->cf_data_size += key.dataSize() + value.dataSize(); + cf_data_size -= it->first.dataSize() + it->second.dataSize(); + new_region->cf_data_size += it->first.dataSize() + it->second.dataSize(); + + new_region->data_cf.insert(std::move(*it)); + it = data_cf.erase(it); } + else + ++it; } - for (auto && [key, value] : write_cf) + for (auto it = write_cf.begin(); it != write_cf.end(); ) { - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); + bool ok = start_key ? it->first >= start_key : true; + ok = ok && (end_key ? it->first < end_key : true); if (ok) { - new_region->write_cf.emplace(key, value); - new_region->cf_data_size += key.dataSize() + value.dataSize(); + cf_data_size -= it->first.dataSize() + it->second.dataSize(); + new_region->cf_data_size += it->first.dataSize() + it->second.dataSize(); + + new_region->write_cf.insert(std::move(*it)); + it = write_cf.erase(it); } + else + ++it; } - for (auto && [key, value] : lock_cf) + for (auto it = lock_cf.begin(); it != lock_cf.end(); ) { - bool ok = start_key ? key >= start_key : true; - ok = ok && (end_key ? key < end_key : true); + bool ok = start_key ? it->first >= start_key : true; + ok = ok && (end_key ? it->first < end_key : true); if (ok) { - new_region->lock_cf.emplace(key, value); + new_region->lock_cf.insert(std::move(*it)); + it = lock_cf.erase(it); } + else + ++it; } return new_region; @@ -213,26 +242,29 @@ RegionPtr Region::splitInto(const RegionMeta & meta) const void Region::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response) { - const auto & change_peer = request.change_peer(); + const auto & change_peer_request = request.change_peer(); const auto & new_region = response.change_peer().region(); - LOG_INFO(log, toString() << " change peer " << eraftpb::ConfChangeType_Name(change_peer.change_type())); + LOG_INFO(log, toString() << " change peer " << eraftpb::ConfChangeType_Name(change_peer_request.change_type())); - switch (change_peer.change_type()) + switch (change_peer_request.change_type()) { case eraftpb::ConfChangeType::AddNode: case eraftpb::ConfChangeType::AddLearnerNode: + { + // change the peers of region, add conf_ver. meta.setRegion(new_region); return; + } case eraftpb::ConfChangeType::RemoveNode: { - const auto & peer = change_peer.peer(); + const auto & peer = change_peer_request.peer(); + auto store_id = peer.store_id(); + + meta.removePeer(store_id); + if (meta.peerId() == peer.id()) - { - // Remove ourself, we will destroy all region data later. - // So we need not to apply following logs. - meta.setPendingRemove(); - } + setPendingRemove(); return; } default: @@ -240,7 +272,7 @@ void Region::execChangePeer(const raft_cmdpb::AdminRequest & request, const raft } } -const metapb::Peer & findPeer(const metapb::Region & region, UInt64 store_id) +const metapb::Peer & FindPeer(const metapb::Region & region, UInt64 store_id) { for (const auto & peer : region.peers()) { @@ -250,7 +282,7 @@ const metapb::Peer & findPeer(const metapb::Region & region, UInt64 store_id) throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } -std::pair Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response) +Regions Region::execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response) { const auto & split_reqs = request.splits(); const auto & new_region_infos = response.splits().regions(); @@ -262,60 +294,60 @@ std::pair Region::execBatchSplit(const raft_cmdpb::AdminRequ } std::vector split_regions; - auto store_id = meta.storeId(); - RegionPtr new_region; for (const auto & region_info : new_region_infos) { - if (region_info.id() == meta.regionId()) - { - RegionMeta new_meta(meta.getPeer(), region_info, meta.getApplyState()); - new_region = splitInto(new_meta); - } - else + if (region_info.id() != meta.regionId()) { - const auto & peer = findPeer(region_info, store_id); + const auto & peer = FindPeer(region_info, meta.storeId()); RegionMeta new_meta(peer, region_info, initialApplyState()); auto split_region = splitInto(new_meta); split_regions.emplace_back(split_region); } } + for (const auto & region_info : new_region_infos) + { + if (region_info.id() == meta.regionId()) + { + RegionMeta new_meta(meta.getPeer(), region_info, meta.getApplyState()); + meta.swap(new_meta); + break; + } + } + std::string ids; for (const auto & region : split_regions) ids += DB::toString(region->id()) + ","; - ids += DB::toString(new_region->id()); + ids += id(); LOG_INFO(log, toString() << " split into [" << ids << "]"); - return {new_region, split_regions}; + return split_regions; } -std::tuple, TableIDSet, bool> Region::onCommand( +std::tuple, TableIDSet, bool> Region::onCommand( const enginepb::CommandRequest & cmd, CmdCallBack & /*callback*/) { auto & header = cmd.header(); - RegionID region_id = header.region_id(); + RegionID region_id = id(); UInt64 term = header.term(); UInt64 index = header.index(); bool sync_log = header.sync_log(); - RegionPtr new_region; std::vector split_regions; - if (meta.isPendingRemove()) - throw Exception("Pending remove flag should be false", ErrorCodes::LOGICAL_ERROR); - if (term == 0 && index == 0) { if (!sync_log) throw Exception("sync_log should be true", ErrorCodes::LOGICAL_ERROR); - return {{}, {}, {}, true}; + return {{}, {}, sync_log}; } if (!checkIndex(index)) - return {{}, {}, {}, false}; + return {{}, {}, false}; TableIDSet table_ids; + bool need_persist = true; if (cmd.has_admin_request()) { @@ -333,12 +365,13 @@ std::tuple, TableIDSet, bool> Region::onComman execChangePeer(request, response); break; case raft_cmdpb::AdminCmdType::BatchSplit: - std::tie(new_region, split_regions) = execBatchSplit(request, response); + split_regions = execBatchSplit(request, response); break; case raft_cmdpb::AdminCmdType::CompactLog: case raft_cmdpb::AdminCmdType::ComputeHash: case raft_cmdpb::AdminCmdType::VerifyHash: // Ignore + need_persist = false; break; default: LOG_ERROR(log, "Unsupported admin command type " << raft_cmdpb::AdminCmdType_Name(type)); @@ -351,10 +384,6 @@ std::tuple, TableIDSet, bool> Region::onComman { auto type = req.cmd_type(); - LOG_TRACE(log, - "Region [" << region_id << "] execute command " << raft_cmdpb::CmdType_Name(type) << " at [term: " << term - << ", index: " << index << "]"); - switch (type) { case raft_cmdpb::CmdType::Put: @@ -379,23 +408,27 @@ std::tuple, TableIDSet, bool> Region::onComman case raft_cmdpb::CmdType::Snap: case raft_cmdpb::CmdType::Get: LOG_WARNING(log, "Region [" << region_id << "] skip unsupported command: " << raft_cmdpb::CmdType_Name(type)); + need_persist = false; break; case raft_cmdpb::CmdType::Prewrite: case raft_cmdpb::CmdType::Invalid: default: LOG_ERROR(log, "Unsupported command type " << raft_cmdpb::CmdType_Name(type)); + need_persist = false; break; } } } - LOG_TRACE(log, toString() << " advance applied index " << index << " and term " << term); - meta.setApplied(index, term); - if (new_region) - (*new_region).meta.setApplied(index, term); - return {new_region, split_regions, table_ids, sync_log}; + if (need_persist) + ++persist_parm; + + for (auto & region : split_regions) + region->last_persist_time.store(last_persist_time); + + return {split_regions, table_ids, sync_log}; } size_t Region::serialize(WriteBuffer & buf) @@ -522,16 +555,24 @@ size_t Region::dataSize() const { return cf_data_size; } void Region::markPersisted() { - std::lock_guard lock(mutex); last_persist_time = Clock::now(); } Timepoint Region::lastPersistTime() const { - std::lock_guard lock(mutex); return last_persist_time; } +size_t Region::persistParm() const +{ + return persist_parm; +} + +void Region::updatePersistParm(size_t x) +{ + persist_parm -= x; +} + std::unique_ptr Region::createCommittedScanRemover(TableID expected_table_id) { return std::make_unique(this->shared_from_this(), expected_table_id); @@ -543,20 +584,46 @@ enginepb::CommandResponse Region::toCommandResponse() const { return meta.toComm RegionRange Region::getRange() const { return meta.getRange(); } -UInt64 Region::learner_read() +UInt64 Region::learnerRead() { if (client != nullptr) return client->getReadIndex(); return 0; } -void Region::wait_index(UInt64 index) +void Region::waitIndex(UInt64 index) { if (client != nullptr) { - LOG_TRACE(log, "begin to wait learner index : " + std::to_string(index)); - meta.wait_index(index); + LOG_TRACE(log, "Region " << id() << " begin to wait learner index: " << index); + meta.waitIndex(index); + LOG_TRACE(log, "Region " << id() << " wait learner index done"); } } +UInt64 Region::version() const { return meta.version(); } + +UInt64 Region::confVer() const { return meta.confVer(); } + +std::pair Region::getHandleRangeByTable(TableID table_id) const +{ + return ::DB::getHandleRangeByTable(getRange(), table_id); +} + +std::pair getHandleRangeByTable(const std::pair & range, TableID table_id) +{ + return getHandleRangeByTable(range.first, range.second, table_id); +} + +std::pair getHandleRangeByTable(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id) +{ + // Example: + // Range: [100_10, 200_5), table_id: 100, then start_handle: 10, end_handle: MAX_HANDLE_ID + + HandleID start_handle = TiKVRange::getRangeHandle(start_key, table_id); + HandleID end_handle = TiKVRange::getRangeHandle(end_key, table_id); + + return {start_handle, end_handle}; +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 137c3723db6..42327acd268 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -21,6 +21,28 @@ namespace DB class Region; using RegionPtr = std::shared_ptr; using Regions = std::vector; +using HandleRange = std::pair; + +struct RegionQueryInfo +{ + RegionID region_id; + UInt64 version; + HandleRange range_in_table; + + bool operator < (const RegionQueryInfo & o) const + { + return range_in_table < o.range_in_table; + } + + bool operator == (const RegionQueryInfo & o) const + { + return range_in_table == o.range_in_table; + } +}; + +std::pair getHandleRangeByTable(const TiKVKey & start_key, const TiKVKey & end_key, TableID table_id); + +std::pair getHandleRangeByTable(const std::pair & range, TableID table_id); /// Store all kv data of one region. Including 'write', 'data' and 'lock' column families. /// TODO: currently the synchronize mechanism is broken and need to fix. @@ -97,7 +119,7 @@ class Region : public std::enable_shared_from_this return InvalidTableID; } - auto next() { return store->readDataByWriteIt(write_map_it++); } + auto next(std::vector * keys = nullptr) { return store->readDataByWriteIt(write_map_it++, keys); } void remove(TableID remove_table_id) { @@ -110,6 +132,14 @@ class Region : public std::enable_shared_from_this } } + void remove(const TiKVKey & key) + { + if (auto it = store->write_cf.find(key); it != store->write_cf.end()) + { + store->removeDataByWriteIt(it); + } + } + LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts) { return store->getLockInfo(expected_table_id, start_ts); } private: @@ -136,7 +166,10 @@ class Region : public std::enable_shared_from_this TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); TableID remove(const std::string & cf, const TiKVKey & key); - std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & persis); + using BatchInsertNode = std::tuple; + void batchInsert(std::function f); + + std::tuple, TableIDSet, bool> onCommand(const enginepb::CommandRequest & cmd, CmdCallBack & persis); std::unique_ptr createCommittedScanRemover(TableID expected_table_id); @@ -158,21 +191,8 @@ class Region : public std::enable_shared_from_this void markPersisted(); Timepoint lastPersistTime() const; - - void swap(Region & other) - { - std::lock_guard lock1(mutex); - std::lock_guard lock2(other.mutex); - - meta.swap(other.meta); - - data_cf.swap(other.data_cf); - write_cf.swap(other.write_cf); - lock_cf.swap(other.lock_cf); - - cf_data_size = size_t(other.cf_data_size); - other.cf_data_size = size_t(cf_data_size); - } + size_t persistParm() const; + void updatePersistParm(size_t x); friend bool operator==(const Region & region1, const Region & region2) { @@ -183,11 +203,16 @@ class Region : public std::enable_shared_from_this && region1.lock_cf == region2.lock_cf && region1.cf_data_size == region2.cf_data_size; } - UInt64 learner_read(); + UInt64 learnerRead(); - void wait_index(UInt64 index); + void waitIndex(UInt64 index); - UInt64 getIndex(); + UInt64 getIndex() const; + + RegionVersion version() const; + RegionVersion confVer() const; + + std::pair getHandleRangeByTable(TableID table_id) const; private: // Private methods no need to lock mutex, normally @@ -199,13 +224,13 @@ class Region : public std::enable_shared_from_this KVMap & getCf(const std::string & cf); using ReadInfo = std::tuple; - ReadInfo readDataByWriteIt(const KVMap::iterator & write_it); + ReadInfo readDataByWriteIt(const KVMap::iterator & write_it, std::vector * keys=nullptr); KVMap::iterator removeDataByWriteIt(const KVMap::iterator & write_it); LockInfoPtr getLockInfo(TableID expected_table_id, UInt64 start_ts); - RegionPtr splitInto(const RegionMeta & meta) const; - std::pair execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response); + RegionPtr splitInto(const RegionMeta & meta); + Regions execBatchSplit(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response); void execChangePeer(const raft_cmdpb::AdminRequest & request, const raft_cmdpb::AdminResponse & response); private: @@ -223,7 +248,9 @@ class Region : public std::enable_shared_from_this // Size of data cf & write cf, without lock cf. std::atomic cf_data_size = 0; - Timepoint last_persist_time = Clock::now(); + std::atomic last_persist_time = Clock::now(); + + std::atomic persist_parm = 1; Logger * log; }; diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index 7defbb586f1..b1c2c0d3ef6 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -35,8 +35,8 @@ static const Field MockDecodeRow(TiDB::CodecFlag flag) } } -Block RegionBlockReader::read(RegionID my_region_id, const TiDB::TableInfo & table_info, - const ColumnsDescription & columns, const Names & ordered_columns_, RegionConcatedScanRemover & scanner) +Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescription & columns, + const Names & ordered_columns_, ScannerPtr & scanner, std::vector * keys) { // Note: this code below is mostly ported from RegionBlockInputStream. Names ordered_columns = ordered_columns_; @@ -72,18 +72,17 @@ Block RegionBlockReader::read(RegionID my_region_id, const TiDB::TableInfo & tab const auto & date_lut = DateLUT::instance(); - // TODO: lock partition, to avoid region adding/droping while writing data + // TODO: lock region, to avoid region adding/droping while writing data TableID my_table_id = table_info.id; TableID next_table_id = InvalidTableID; - RegionID next_region_id = InvalidRegionID; // Here we use do{}while() instead of while(){}, // Because the first check of scanner.hasNext() already been done outside of this function. while (true) { // TODO: comfirm all this mess - auto [handle, write_type, commit_ts, value] = scanner.next(); + auto [handle, write_type, commit_ts, value] = scanner->next(keys); if (write_type == Region::PutFlag || write_type == Region::DelFlag) { @@ -158,9 +157,9 @@ Block RegionBlockReader::read(RegionID my_region_id, const TiDB::TableInfo & tab column_map[handle_id].first->insert(Field(handle)); } - std::tie(next_table_id, next_region_id, std::ignore) = scanner.hasNext(); + next_table_id = scanner->hasNext(); - if (next_table_id == InvalidTableID || (next_table_id != my_table_id) || (next_region_id != my_region_id)) + if (next_table_id == InvalidTableID || next_table_id != my_table_id) break; } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index b07ec5ed4c9..eaa0aa09760 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -22,145 +22,9 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -class RegionConcatedScanRemover -{ -public: - RegionConcatedScanRemover(Regions && regions_, TableID table_id, bool remove_on_read_, bool resolve_locks_, UInt64 start_ts_) - : regions(std::move(regions_)), expected_table_id(table_id), remove_on_read(remove_on_read_), resolve_locks(resolve_locks_), start_ts(start_ts_) - { - if (regions.empty()) - throw Exception("empty regions", ErrorCodes::LOGICAL_ERROR); - - std::sort(regions.begin(), regions.end(), [](const RegionPtr & r1, const RegionPtr & r2) { - return r1->getRange() < r2->getRange(); - }); - - curr_region_index = 0; - curr_region_id = regions[curr_region_index]->id(); - curr_scanner = regions[0]->createCommittedScanRemover(InvalidTableID); - - if (!remove_on_read && resolve_locks) curr_lock_info = curr_scanner->getLockInfo(expected_table_id, start_ts); - } - - // return {InvalidTableID, InvalidRegionID} when no more data - std::tuple hasNext() - { - if (curr_lock_info) - { - // lock seen, will ignore data in the rest regions, and find next lock. - ever_seen_lock = true; - auto lock_info = std::move(curr_lock_info); - - while ((++curr_region_index) < regions.size()) - { - curr_scanner = regions[curr_region_index]->createCommittedScanRemover(expected_table_id); - curr_region_id = regions[curr_region_index]->id(); - if ((curr_lock_info = curr_scanner->getLockInfo(expected_table_id, start_ts)) != nullptr) - break; - } - - return std::make_tuple(expected_table_id, curr_region_id, std::move(lock_info)); - } - else if (ever_seen_lock) - { - // Ever seen lock and no other lock left, finish. - return std::make_tuple(InvalidTableID, InvalidRegionID, nullptr); - } - - auto table_id = curr_scanner->hasNext(); - - if (table_id != InvalidTableID) - { - if (curr_table_id == InvalidTableID) - curr_table_id = table_id; - if (remove_on_read && table_id != curr_table_id) - curr_scanner->remove(curr_table_id); - - curr_table_id = table_id; - return std::make_tuple(table_id, curr_region_id, nullptr); - } - else - { - if (remove_on_read && curr_table_id != InvalidTableID) - curr_scanner->remove(curr_table_id); - - // end of current region, go to next. - while ((++curr_region_index) < regions.size()) - { - curr_scanner = regions[curr_region_index]->createCommittedScanRemover(expected_table_id); - curr_region_id = regions[curr_region_index]->id(); - - // check lock before checking data. - if (resolve_locks && (curr_lock_info = curr_scanner->getLockInfo(expected_table_id, start_ts)) != nullptr) - return std::make_tuple(expected_table_id, curr_region_id, nullptr); - - if ((curr_table_id = curr_scanner->hasNext()) != InvalidTableID) - return std::make_tuple(curr_table_id, curr_region_id, nullptr); - } - return std::make_tuple(InvalidTableID, InvalidRegionID, nullptr); - } - } - - auto next() { return curr_scanner->next(); } - -private: - Regions regions; - TableID expected_table_id; - bool remove_on_read; - bool resolve_locks; - UInt64 start_ts; - - size_t curr_region_index = 0; - RegionID curr_region_id = InvalidRegionID; - TableID curr_table_id = InvalidTableID; - - using ScannerPtr = std::unique_ptr; - - ScannerPtr curr_scanner; - - Region::LockInfoPtr curr_lock_info = nullptr; - bool ever_seen_lock = false; -}; - -// FIXME: remove_on_read is not safe because cache will be removed before data flush into storage. -// If the process reboot during flush, it could lead to data loss. -// Please design another mechanism to gracefully remove cache after flush. -class RegionBlockReader -{ -public: - using SchemaFetcher = std::function(TableID)>; - - RegionBlockReader(SchemaFetcher schema_fetcher_, Regions && regions, TableID table_id, bool remove_on_read, bool resolve_locks, UInt64 start_ts) - : schema_fetcher(schema_fetcher_), scanner(std::move(regions), table_id, remove_on_read, resolve_locks, start_ts) - { - } - - std::tuple next() - { - auto [table_id, region_id, lock_info] = scanner.hasNext(); - if (lock_info) - { - return {Block(), table_id, region_id, std::move(lock_info)}; - } - else if (table_id != InvalidTableID) - { - const auto [table_info, columns, ordered_columns] = schema_fetcher(table_id); - auto block = read(region_id, *table_info, *columns, *ordered_columns, scanner); - return {block, table_id, region_id, nullptr}; - } - else - { - return {Block(), InvalidTableID, InvalidRegionID, nullptr}; - } - } - -private: - Block read(RegionID my_region_id, const TiDB::TableInfo & table_info, - const ColumnsDescription & columns, const Names & ordered_columns_, RegionConcatedScanRemover & scanner); +using ScannerPtr = std::unique_ptr; -private: - SchemaFetcher schema_fetcher; - RegionConcatedScanRemover scanner; -}; +Block RegionBlockRead(const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, const Names & ordered_columns_, ScannerPtr & curr_scanner, std::vector * keys= nullptr); } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index 2116b0d2d9c..01a5edfad13 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -12,10 +12,8 @@ size_t RegionMeta::serializeSize() const auto peer_size = sizeof(UInt64) + sizeof(UInt64) + sizeof(bool); // TODO: this region_size not right, 4 bytes missed - auto region_size = sizeof(UInt64) - + sizeof(UInt32) + KEY_SIZE_WITHOUT_TS - + sizeof(UInt32) + KEY_SIZE_WITHOUT_TS - + sizeof(UInt64) + sizeof(UInt64); + auto region_size + = sizeof(UInt64) + sizeof(UInt32) + KEY_SIZE_WITHOUT_TS + sizeof(UInt32) + KEY_SIZE_WITHOUT_TS + sizeof(UInt64) + sizeof(UInt64); region_size += peer_size * region.peers_size(); auto apply_state_size = sizeof(UInt64) + sizeof(UInt64) + sizeof(UInt64); return peer_size + region_size + apply_state_size + sizeof(UInt64) + sizeof(bool); @@ -74,7 +72,7 @@ metapb::Region RegionMeta::getRegion() const return region; } -raft_serverpb::RaftApplyState RegionMeta::getApplyState() const +const raft_serverpb::RaftApplyState & RegionMeta::getApplyState() const { std::lock_guard lock(mutex); return apply_state; @@ -149,8 +147,8 @@ RegionRange RegionMeta::getRange() const std::string RegionMeta::toString(bool dump_status) const { std::lock_guard lock(mutex); - std::string status_str = !dump_status ? "" : ", term: " + DB::toString(applied_term) + - ", applied_index: " + DB::toString(apply_state.applied_index()); + std::string status_str + = !dump_status ? "" : ", term: " + DB::toString(applied_term) + ", applied_index: " + DB::toString(apply_state.applied_index()); return "region[id: " + DB::toString(region.id()) + status_str + "]"; } @@ -161,17 +159,59 @@ bool RegionMeta::isPendingRemove() const } void RegionMeta::setPendingRemove() +{ + { + std::lock_guard lock(mutex); + pending_remove = true; + } + cv.notify_all(); +} + +void RegionMeta::waitIndex(UInt64 index) +{ + std::unique_lock lk(mutex); + cv.wait(lk, [this, index] { + return pending_remove || apply_state.applied_index() >= index; + }); +} + +UInt64 RegionMeta::version() const +{ + std::lock_guard lock(mutex); + return region.region_epoch().version(); +} + +UInt64 RegionMeta::confVer() const +{ + std::lock_guard lock(mutex); + return region.region_epoch().conf_ver(); +} + +void RegionMeta::swap(RegionMeta & other) { std::lock_guard lock(mutex); - pending_remove = true; + peer.Swap(&other.peer); + region.Swap(&other.region); + apply_state.Swap(&other.apply_state); + std::swap(applied_term, other.applied_term); + std::swap(pending_remove, other.pending_remove); } -void RegionMeta::wait_index(UInt64 index) { - std::cout<<"get index: "< lk(mutex); - cv.wait(lk, [this, index]{ - return apply_state.applied_index() >= index; - }); + + auto mutable_peers = region.mutable_peers(); + + for (auto it = mutable_peers->begin(); it != mutable_peers->end(); ++it) + { + if (it->store_id() == store_id) + { + mutable_peers->erase(it); + return; + } + } + throw Exception("peer with store_id " + DB::toString(store_id) + " not found", ErrorCodes::LOGICAL_ERROR); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index 2e15b11c7b6..980033dfd87 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -40,12 +40,16 @@ class RegionMeta pingcap::kv::RegionVerID getRegionVerID() const { return pingcap::kv::RegionVerID { region.id(), - region.region_epoch().conf_ver(), - region.region_epoch().version() + confVer(), + version() }; } - raft_serverpb::RaftApplyState getApplyState() const; + UInt64 version() const; + + UInt64 confVer() const; + + const raft_serverpb::RaftApplyState & getApplyState() const; void setRegion(const metapb::Region & region); void setApplied(UInt64 index, UInt64 term); @@ -62,14 +66,7 @@ class RegionMeta bool isPendingRemove() const; void setPendingRemove(); - void swap(RegionMeta & other) - { - peer.Swap(&other.peer); - region.Swap(&other.region); - apply_state.Swap(&other.apply_state); - std::swap(applied_term, other.applied_term); - std::swap(pending_remove, other.pending_remove); - } + void swap(RegionMeta & other); friend bool operator==(const RegionMeta & meta1, const RegionMeta & meta2) { @@ -77,7 +74,9 @@ class RegionMeta && meta1.applied_term == meta2.applied_term; } - void wait_index(UInt64 index); + void waitIndex(UInt64 index); + + void removePeer(UInt64 store_id); private: metapb::Peer peer; diff --git a/dbms/src/Storages/Transaction/RegionPartition.cpp b/dbms/src/Storages/Transaction/RegionPartition.cpp deleted file mode 100644 index 56d6a667228..00000000000 --- a/dbms/src/Storages/Transaction/RegionPartition.cpp +++ /dev/null @@ -1,540 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -extern const int UNKNOWN_TABLE; -} // namespace ErrorCodes - -// ============================================================= -// Static methods. -// ============================================================= - -auto getRegionTableIds(const RegionPtr & region) -{ - std::unordered_set table_ids; - { - auto scanner = region->createCommittedScanRemover(InvalidTableID); - while (true) - { - TableID table_id = scanner->hasNext(); - if (table_id == InvalidTableID) - break; - table_ids.emplace(table_id); - scanner->next(); - } - } - return table_ids; -} - -Int64 calculatePartitionCacheBytes(KVStore & kvstore, const std::set & region_ids) -{ - Int64 bytes = 0; - for (auto region_id : region_ids) - { - bytes += kvstore.getRegion(region_id)->dataSize(); - } - return bytes; -} - -// ============================================================= -// Private member functions. -// ============================================================= - -RegionPartition::Table & RegionPartition::getOrCreateTable(TableID table_id) -{ - auto it = tables.find(table_id); - if (it == tables.end()) - { - // Load persisted info. - auto & tmt_ctx = context.getTMTContext(); - auto storage = tmt_ctx.storages.get(table_id); - if (!storage) - { - tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); - storage = tmt_ctx.storages.get(table_id); - } - - auto * merge_tree = dynamic_cast(storage.get()); - auto partition_number = merge_tree->getData().settings.mutable_mergetree_partition_number; - - std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id, partition_number); - - auto & table = it->second; - table.partitions.persist(); - } - return it->second; -} - -size_t RegionPartition::selectPartitionId(Table & table, RegionID /*region_id*/) -{ - // size_t partition_id = rng() % table.partitions.get().size(); - size_t partition_id = (next_partition_id++) % table.partitions.get().size(); - return partition_id; -} - -std::pair RegionPartition::insertRegion(Table & table, size_t partition_id, RegionID region_id) -{ - auto & table_partitions = table.partitions.get(); - // Insert table mapping. - table_partitions[partition_id].region_ids.emplace(region_id); - table.partitions.persist(); - - // Insert region mapping. - auto r_it = regions.find(region_id); - if (r_it == regions.end()) - std::tie(r_it, std::ignore) = regions.try_emplace(region_id); - RegionInfo & region_info = r_it->second; - region_info.table_to_partition.emplace(table.table_id, partition_id); - - return {partition_id, table_partitions[partition_id]}; -} - -std::pair RegionPartition::getOrInsertRegion(TableID table_id, RegionID region_id) -{ - auto & table = getOrCreateTable(table_id); - auto & table_partitions = table.partitions.get(); - - for (size_t partition_index = 0; partition_index < table_partitions.size(); ++partition_index) - { - const auto & partition = table_partitions[partition_index]; - if (partition.region_ids.find(region_id) != partition.region_ids.end()) - return {partition_index, table_partitions[partition_index]}; - } - - // Currently region_id does not exist in any regions in this table, let's insert into one. - size_t partition_id = selectPartitionId(table, region_id); - LOG_DEBUG(log, "Table " << table_id << " assign region " << region_id << " to partition " << partition_id); - - return insertRegion(table, partition_id, region_id); -} - -void RegionPartition::updateRegionRange(const RegionPtr & region) -{ - auto region_id = region->id(); - const auto range = region->getRange(); - - auto it = regions.find(region_id); - // if this region does not exist already, then nothing to shrink. - if (it == regions.end()) - return; - - RegionInfo & region_info = it->second; - auto t_it = region_info.table_to_partition.begin(); - while (t_it != region_info.table_to_partition.end()) - { - auto table_id = t_it->first; - auto partition_id = t_it->second; - if (TiKVRange::checkTableInvolveRange(table_id, range)) - { - ++t_it; - continue; - } - - // remove from table mapping - auto table_it = tables.find(table_id); - if (table_it == tables.end()) - { - throw Exception("Table " + DB::toString(table_id) + " not found in table map", ErrorCodes::LOGICAL_ERROR); - } - - Table & table = table_it->second; - auto & partition = table.partitions.get().at(partition_id); - partition.region_ids.erase(region_id); - table.partitions.persist(); - - // remove from region mapping - t_it = region_info.table_to_partition.erase(t_it); - } -} - -bool RegionPartition::shouldFlush(const Partition & partition) -{ - if (partition.pause_flush) - return false; - if (partition.must_flush) - return true; - if (!partition.updated || !partition.cache_bytes) - return false; - auto period_time = Clock::now() - partition.last_flush_time; - for (auto && [th_bytes, th_duration] : flush_thresholds) - { - if (partition.cache_bytes >= th_bytes && period_time >= th_duration) - return true; - } - return false; -} - -void RegionPartition::flushPartition(TableID table_id, PartitionID partition_id) -{ - if (log->debug()) - { - auto & table = getOrCreateTable(table_id); - auto & partition = table.partitions.get()[partition_id]; - std::string region_ids; - for (auto id : partition.region_ids) - region_ids += DB::toString(id) + ","; - if (!region_ids.empty()) - region_ids.pop_back(); - LOG_DEBUG(log, - "Flush regions - table_id: " + DB::toString(table_id) + ", partition_id: " + DB::toString(partition_id) + ", ~ " - + DB::toString(partition.cache_bytes) + " bytes, containing region_ids: " + region_ids); - } - - TMTContext & tmt = context.getTMTContext(); - tmt.getSchemaSyncer()->syncSchema(table_id, context); - - StoragePtr storage = tmt.storages.get(table_id); - - // TODO: handle if storage is nullptr - // drop table and create another with same name, but the previous one will still flush - if (storage == nullptr) - { - - LOG_ERROR(log, "table " << table_id << " flush partition " << partition_id << " , but storage is not found"); - return; - } - - auto * merge_tree = dynamic_cast(storage.get()); - - const auto & table_info = merge_tree->getTableInfo(); - const auto & columns = merge_tree->getColumns(); - // TODO: confirm names is right - Names names = columns.getNamesOfPhysical(); - - BlockInputStreamPtr input = getBlockInputStreamByPartition(table_id, partition_id, table_info, columns, names, true, false, false, 0); - if (!input) - return; - - TxnMergeTreeBlockOutputStream output(*merge_tree, partition_id); - input->readPrefix(); - output.writePrefix(); - while (true) - { - Block block = input->read(); - if (!block || block.rows() == 0) - break; - output.write(block); - } - output.writeSuffix(); - input->readSuffix(); -} - -// ============================================================= -// Public member functions. -// ============================================================= - -static const Int64 FTH_BYTES_1 = 1024; // 1 KB -static const Int64 FTH_BYTES_2 = 1024 * 1024; // 1 MB -static const Int64 FTH_BYTES_3 = 1024 * 1024 * 10; // 10 MBs -static const Int64 FTH_BYTES_4 = 1024 * 1024 * 50; // 50 MBs - -static const Seconds FTH_PERIOD_1(60 * 60); // 1 hour -static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes -static const Seconds FTH_PERIOD_3(60); // 1 minute -static const Seconds FTH_PERIOD_4(5); // 5 seconds - -RegionPartition::RegionPartition(Context & context_, const std::string & parent_path_, std::function region_fetcher) - : parent_path(parent_path_), - flush_thresholds{ - {FTH_BYTES_1, FTH_PERIOD_1}, - {FTH_BYTES_2, FTH_PERIOD_2}, - {FTH_BYTES_3, FTH_PERIOD_3}, - {FTH_BYTES_4, FTH_PERIOD_4}, - }, - context(context_), - log(&Logger::get("RegionPartition")) -{ - Poco::File dir(parent_path + "tables/"); - if (!dir.exists()) - dir.createDirectories(); - - std::vector file_names; - dir.list(file_names); - - // Restore all table mappings and region mappings. - for (auto & name : file_names) - { - TableID table_id = std::stoull(name); - auto p = tables.try_emplace(table_id, parent_path + "tables/", table_id); - Table & table = p.first->second; - for (PartitionID partition_id = 0; partition_id < table.partitions.get().size(); ++partition_id) - { - auto & partition = table.partitions.get()[partition_id]; - auto it = partition.region_ids.begin(); - while (it != partition.region_ids.end()) - { - // Update cache infos - auto region_id = *it; - auto region_ptr = region_fetcher(region_id); - if (!region_ptr) - { - // It could happen that process crash after region split or region snapshot apply, - // and region has not been persisted, but region <-> partition mapping does. - it = partition.region_ids.erase(it); - LOG_WARNING(log, "Region " << region_id << " not found from KVStore, dropped."); - continue; - } - else - { - ++it; - } - partition.cache_bytes += region_ptr->dataSize(); - - // Update region_id -> table_id & partition_id - auto it = regions.find(region_id); - if (it == regions.end()) - std::tie(it, std::ignore) = regions.try_emplace(region_id); - RegionInfo & region_info = it->second; - region_info.table_to_partition.emplace(table_id, partition_id); - } - } - - table.partitions.persist(); - } -} - -void RegionPartition::updateRegion(const RegionPtr & region, size_t before_cache_bytes, TableIDSet relative_table_ids) -{ - std::lock_guard lock(mutex); - - auto region_id = region->id(); - Int64 delta = region->dataSize() - before_cache_bytes; - for (auto table_id : relative_table_ids) - { - auto & partition = getOrInsertRegion(table_id, region_id).second; - partition.updated = true; - partition.cache_bytes += delta; - } -} - -void RegionPartition::applySnapshotRegion(const RegionPtr & region) -{ - std::lock_guard lock(mutex); - - auto region_id = region->id(); - auto table_ids = getRegionTableIds(region); - for (auto table_id : table_ids) - { - auto & partition = getOrInsertRegion(table_id, region_id).second; - partition.must_flush = true; - partition.cache_bytes += region->dataSize(); - } -} - -void RegionPartition::splitRegion(const RegionPtr & region, std::vector split_regions) -{ - struct MoveAction - { - StorageMergeTree * storage; - PartitionID current_partition_id; - PartitionID new_partition_id; - Field start; - Field end; - }; - std::vector move_actions; - - { - std::lock_guard lock(mutex); - - auto region_id = region->id(); - auto it = regions.find(region_id); - - if (it == regions.end()) - { - // If region doesn't exist, usually means it does not contain any data we interested. Just ignore it. - return; - } - - RegionInfo & region_info = it->second; - auto & tmt_ctx = context.getTMTContext(); - for (auto [table_id, current_partition_id] : region_info.table_to_partition) - { - auto storage = tmt_ctx.storages.get(table_id); - if (storage == nullptr) - { - throw Exception("Table " + DB::toString(table_id) + " not found", ErrorCodes::UNKNOWN_TABLE); - } - - auto * merge_tree_storage = dynamic_cast(storage.get()); - - auto & table = getOrCreateTable(table_id); - auto & table_partitions = table.partitions.get(); - // Tables which have only one partition don't need to move data. - if (table_partitions.size() == 1) - continue; - - for (const RegionPtr & split_region : split_regions) - { - const auto range = split_region->getRange(); - // This region definitely does not contain any data in this table. - if (!TiKVRange::checkTableInvolveRange(table_id, range)) - continue; - - // Select another partition other than current_partition_id; - auto split_region_id = split_region->id(); - size_t new_partition_id; - while ((new_partition_id = selectPartitionId(table, split_region_id)) == current_partition_id) {} - LOG_DEBUG(log, "Table " << table.table_id << " assign region " << split_region_id << " to partition " << new_partition_id); - - auto [start_field, end_field] = getRegionRangeField(range.first, range.second, table_id); - move_actions.push_back({merge_tree_storage, current_partition_id, new_partition_id, start_field, end_field}); - - auto & partition = insertRegion(table, new_partition_id, split_region_id).second; - // Mark flush flag. - partition.must_flush = true; - } - } - - updateRegionRange(region); - } - - // FIXME: move data should be locked for safety, and do it aysnchronized. - for (const auto & action : move_actions) - { - moveRangeBetweenPartitions(context, action.storage, action.current_partition_id, action.new_partition_id, action.start, action.end); - } -} - -void RegionPartition::removeRegion(const RegionPtr & region) -{ - std::unordered_map table_partitions; - auto region_id = region->id(); - auto region_cache_bytes = region->dataSize(); - { - std::lock_guard lock(mutex); - - auto r_it = regions.find(region_id); - if (r_it == regions.end()) - { - LOG_WARNING(log, "Being removed region " << region_id << " does not exist."); - return; - } - RegionInfo & region_info = r_it->second; - table_partitions.swap(region_info.table_to_partition); - - regions.erase(region_id); - - for (auto [table_id, partition_id] : table_partitions) - { - auto & table = getOrCreateTable(table_id); - Partition & partition = table.partitions.get().at(partition_id); - partition.cache_bytes -= region_cache_bytes; - partition.region_ids.erase(region_id); - table.partitions.persist(); - } - } - - // Note that we cannot use lock to protect following code, as deleteRangeInPartition will result in - // calling getBlockInputStreamByPartition and lead to dead lock. - auto & tmt_ctx = context.getTMTContext(); - for (auto [table_id, partition_id] : table_partitions) - { - auto storage = tmt_ctx.storages.get(table_id); - if (storage == nullptr) - { - LOG_WARNING(log, "RegionPartition::removeRegion: " << table_id << " does not exist."); - continue; - } - auto * merge_tree = dynamic_cast(storage.get()); - auto [start_key, end_key] = region->getRange(); - auto [start_field, end_field] = getRegionRangeField(start_key, end_key, table_id); - deleteRangeInPartition(context, merge_tree, partition_id, start_field, end_field); - } -} - -bool RegionPartition::tryFlushRegions() -{ - KVStore & kvstore = *context.getTMTContext().kvstore; - std::set> to_flush; - { - traversePartitions([&](TableID table_id, PartitionID partition_id, Partition & partition) { - if (shouldFlush(partition)) - { - to_flush.emplace(table_id, partition_id); - // Stop other flush threads. - partition.pause_flush = true; - } - }); - } - - for (auto [table_id, partition_id] : to_flush) - { - flushPartition(table_id, partition_id); - } - - { - // Now reset status infomations. - Timepoint now = Clock::now(); - traversePartitions([&](TableID table_id, PartitionID partition_id, Partition & partition) { - if (to_flush.count({table_id, partition_id})) - { - partition.pause_flush = false; - partition.must_flush = false; - partition.updated = false; - partition.cache_bytes = calculatePartitionCacheBytes(kvstore, partition.region_ids); - partition.last_flush_time = now; - } - }); - } - - return !to_flush.empty(); -} - -void RegionPartition::traversePartitions(std::function callback) -{ - std::lock_guard lock(mutex); - for (auto && [table_id, table] : tables) - { - size_t id = 0; - for (auto & partition : table.partitions.get()) - { - callback(table_id, id, partition); - ++id; - } - } -} - -void RegionPartition::traverseRegionsByTablePartition( - const TableID table_id, const PartitionID partition_id, std::function callback) -{ - auto & kvstore = context.getTMTContext().kvstore; - Regions partition_regions; - { - std::lock_guard lock(mutex); - auto & table = getOrCreateTable(table_id); - auto & partition = table.partitions.get()[partition_id]; - - for (auto & region_id : partition.region_ids) - { - auto region = kvstore->getRegion(region_id); - if (!region) - throw Exception("Region " + DB::toString(region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); - partition_regions.push_back(region); - } - } - callback(partition_regions); -} - -void RegionPartition::dumpRegionMap(RegionPartition::RegionMap & res) -{ - std::lock_guard lock(mutex); - res = regions; -} - -void RegionPartition::dropRegionsInTable(TableID /*table_id*/) -{ - // TODO: impl -} - -} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionPartition.h b/dbms/src/Storages/Transaction/RegionPartition.h deleted file mode 100644 index 0575702372f..00000000000 --- a/dbms/src/Storages/Transaction/RegionPartition.h +++ /dev/null @@ -1,191 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include -#include - -#include -#include - -namespace DB -{ - -/// Store and manage the mapping between regions and partitions. -/// -/// A table is consisted by multiple partitions, and a partition is consisted by multiple regions. -/// A region could be contained by multiple tables. And in a table, one specific region can only exist in one partition. -/// Regions have [start_end, end_key) bound, which is used to determine which tables it could be contained. -/// -/// Here is an example of storage range of region data: -/// |.......region 0.........|.......region 1.........|.......................region 2..........| -/// ^ ^ ^ ^ ^ -/// | | | | | -/// ------part a1----------- --------par a2---------- ------par b1------- --------par c1------- -/// | | | | -/// ----------------------table a-------------------- -----table b------- --------table c------ -/// -class RegionPartition : private boost::noncopyable -{ -public: - struct Partition - { - using RegionIDSet = std::set; - RegionIDSet region_ids; - - Partition() {} - Partition(const Partition & p) : region_ids(p.region_ids) {} - Partition(RegionIDSet && region_ids_) : region_ids(std::move(region_ids_)) {} - - struct Write - { - void operator()(const Partition & p, DB::WriteBuffer & buf) - { - writeIntBinary(p.region_ids.size(), buf); - for (auto id : p.region_ids) - { - writeIntBinary(id, buf); - } - } - }; - - struct Read - { - Partition operator()(DB::ReadBuffer & buf) - { - UInt64 size; - readIntBinary(size, buf); - RegionIDSet region_ids; - for (size_t i = 0; i < size; ++i) - { - RegionID id; - readIntBinary(id, buf); - region_ids.insert(id); - } - return {std::move(region_ids)}; - } - }; - - // Statistics to serve flush. - // Note that it is not 100% accurate because one region could belongs to more than one partition by different tables. - // And when a region is updated, we don't distinguish carefully which partition, simply update them all. - // It is not a real issue as do flush on a partition is not harmful. Besides, the situation is very rare that a region belongs to many partitions. - // Those members below are not persisted. - - bool pause_flush = false; - bool must_flush = false; - bool updated = false; - Int64 cache_bytes = 0; - Timepoint last_flush_time = Clock::now(); - }; - - struct Table - { - Table(const std::string & parent_path, TableID table_id_, size_t partition_number) - : table_id(table_id_), partitions(parent_path + DB::toString(table_id)) - { - partitions.restore(); - - if (partitions.get().empty()) - { - std::vector new_partitions(partition_number); - partitions.get().swap(new_partitions); - - partitions.persist(); - } - } - - Table(const std::string & parent_path, TableID table_id_) : table_id(table_id_), partitions(parent_path + DB::toString(table_id)) - { - partitions.restore(); - } - - using Partitions = PersistedContainerVector; - - TableID table_id; - Partitions partitions; - }; - - struct RegionInfo - { - std::unordered_map table_to_partition; - }; - - using TableMap = std::unordered_map; - using RegionMap = std::unordered_map; - using FlushThresholds = std::vector>; - -private: - const std::string parent_path; - - TableMap tables; - RegionMap regions; - - FlushThresholds flush_thresholds; - - // std::minstd_rand rng = std::minstd_rand(randomSeed()); - size_t next_partition_id = 0; - - Context & context; - - mutable std::mutex mutex; - Logger * log; - -private: - Table & getOrCreateTable(TableID table_id); - size_t selectPartitionId(Table & table, RegionID region_id); - std::pair insertRegion(Table & table, size_t partition_id, RegionID region_id); - std::pair getOrInsertRegion(TableID table_id, RegionID region_id); - - /// This functional only shrink the table range of this region_id, range expand will (only) be done at flush. - /// Note that region update range should not affect the data in storage. - void updateRegionRange(const RegionPtr & region); - - bool shouldFlush(const Partition & partition); - void flushPartition(TableID table_id, PartitionID partition_id); - -public: - RegionPartition(Context & context_, const std::string & parent_path_, std::function region_fetcher); - void setFlushThresholds(FlushThresholds flush_thresholds_) { flush_thresholds = std::move(flush_thresholds_); } - - /// After the region is updated (insert or delete KVs). - void updateRegion(const RegionPtr & region, size_t before_cache_bytes, TableIDSet relative_table_ids); - /// A new region arrived by apply snapshot command, this function store the region into selected partitions. - void applySnapshotRegion(const RegionPtr & region); - /// Manage data after region split into split_regions. - /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. - void splitRegion(const RegionPtr & region, std::vector split_regions); - /// Remove a region from corresponding partitions. - void removeRegion(const RegionPtr & region); - - /// Try pick some regions and flush. - /// Note that flush is organized by partition. i.e. if a regions is selected to be flushed, all regions belong to its partition will also flushed. - /// This function will be called constantly by background threads. - /// Returns whether this function has done any meaningful job. - bool tryFlushRegions(); - - void traversePartitions(std::function callback); - void traverseRegionsByTablePartition(const TableID table_id, const PartitionID partition_id, std::function callback); - - BlockInputStreamPtr getBlockInputStreamByPartition( // - TableID table_id, - UInt64 partition_id, - const TiDB::TableInfo & table_info, - const ColumnsDescription & columns, - const Names & ordered_columns, - bool remove_on_read, - bool learner_read, - bool resolve_locks, - UInt64 start_ts); - - // For debug - void dumpRegionMap(RegionPartition::RegionMap & res); - void dropRegionsInTable(TableID table_id); -}; - -using RegionPartitionPtr = std::shared_ptr; - -} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index fd8fa4453aa..eabf83f777c 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -71,6 +71,7 @@ void RegionPersister::persist(const RegionPtr & region) { /// Multi threads persist is not yet supported. std::lock_guard persist_lock(persist_mutex); + size_t persist_parm = region->persistParm(); auto region_id = region->id(); @@ -94,6 +95,9 @@ void RegionPersister::persist(const RegionPtr & region) if (!exists) coverOldRegion(cur_file, region_id); } + + region->markPersisted(); + region->updatePersistParm(persist_parm); } /// Old regions are cover by newer regions with the same id. diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp new file mode 100644 index 00000000000..9042f003f8d --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -0,0 +1,474 @@ +#include + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_TABLE; +} // namespace ErrorCodes + +// ============================================================= +// Static methods. +// ============================================================= + +auto getRegionTableIds(const RegionPtr & region) +{ + std::unordered_set table_ids; + { + auto scanner = region->createCommittedScanRemover(InvalidTableID); + while (true) + { + TableID table_id = scanner->hasNext(); + if (table_id == InvalidTableID) + break; + table_ids.emplace(table_id); + scanner->next(); + } + } + return table_ids; +} + +// ============================================================= +// Private member functions. +// ============================================================= + +RegionTable::Table & RegionTable::getOrCreateTable(TableID table_id) +{ + auto it = tables.find(table_id); + if (it == tables.end()) + { + // Load persisted info. + auto & tmt_ctx = context.getTMTContext(); + auto storage = tmt_ctx.storages.get(table_id); + if (!storage) + { + tmt_ctx.getSchemaSyncer()->syncSchema(table_id, context); + storage = tmt_ctx.storages.get(table_id); + } + + std::tie(it, std::ignore) = tables.try_emplace(table_id, parent_path + "tables/", table_id); + + auto & table = it->second; + table.regions.persist(); + } + return it->second; +} + +RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, RegionID region_id) +{ + auto & table_regions = table.regions.get(); + // Insert table mapping. + table.regions.get().emplace(region_id, InternalRegion(region_id)); + table.regions.persist(); + + // Insert region mapping. + auto r_it = regions.find(region_id); + if (r_it == regions.end()) + std::tie(r_it, std::ignore) = regions.try_emplace(region_id); + RegionInfo & region_info = r_it->second; + region_info.tables.emplace(table.table_id); + + return table_regions[region_id]; +} + +RegionTable::InternalRegion & RegionTable::getOrInsertRegion(TableID table_id, RegionID region_id) +{ + auto & table = getOrCreateTable(table_id); + auto & table_regions = table.regions.get(); + if (auto it = table_regions.find(region_id); it != table_regions.end()) + return it->second; + + return insertRegion(table, region_id); +} + +void RegionTable::updateRegionRange(const RegionPtr & region) +{ + auto region_id = region->id(); + const auto range = region->getRange(); + + auto it = regions.find(region_id); + // if this region does not exist already, then nothing to shrink. + if (it == regions.end()) + return; + + RegionInfo & region_info = it->second; + auto t_it = region_info.tables.begin(); + while (t_it != region_info.tables.end()) + { + auto table_id = *t_it; + if (TiKVRange::checkTableInvolveRange(table_id, range)) + { + ++t_it; + continue; + } + + // remove from table mapping + auto table_it = tables.find(table_id); + if (table_it == tables.end()) + { + throw Exception("Table " + DB::toString(table_id) + " not found in table map", ErrorCodes::LOGICAL_ERROR); + } + + Table & table = table_it->second; + table.regions.get().erase(region_id); + table.regions.persist(); + + // remove from region mapping + t_it = region_info.tables.erase(t_it); + } +} + +bool RegionTable::shouldFlush(const InternalRegion & region) +{ + if (region.pause_flush) + return false; + if (region.must_flush) + return true; + if (!region.updated || !region.cache_bytes) + return false; + auto period_time = Clock::now() - region.last_flush_time; + for (auto && [th_bytes, th_duration] : flush_thresholds) + { + if (region.cache_bytes >= th_bytes && period_time >= th_duration) + return true; + } + return false; +} + +void RegionTable::flushRegion(TableID table_id, RegionID region_id, size_t & rest_cache_size) +{ + if (log->debug()) + { + auto & table = getOrCreateTable(table_id); + auto & region = table.regions.get()[region_id]; + LOG_DEBUG(log, + "Flush region - table_id: " + DB::toString(table_id) + ", original " + DB::toString(region.cache_bytes) + + " bytes, region_id: " + DB::toString(region_id)); + } + + TMTContext & tmt = context.getTMTContext(); + tmt.getSchemaSyncer()->syncSchema(table_id, context); + + StoragePtr storage = tmt.storages.get(table_id); + + // TODO: handle if storage is nullptr + // drop table and create another with same name, but the previous one will still flush + if (storage == nullptr) + { + LOG_ERROR(log, "table " << table_id << " flush region " << region_id << " , but storage is not found"); + return; + } + + std::vector keys_to_remove; + { + auto merge_tree = std::dynamic_pointer_cast(storage); + + auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__); + + const auto & table_info = merge_tree->getTableInfo(); + const auto & columns = merge_tree->getColumns(); + // TODO: confirm names is right + Names names = columns.getNamesOfPhysical(); + auto [input, status, tol] = getBlockInputStreamByRegion( + table_id, region_id, InvalidRegionVersion, table_info, columns, names, false, false, 0, &keys_to_remove); + if (input == nullptr) + return; + + std::ignore = status; + std::ignore = tol; + + TxnMergeTreeBlockOutputStream output(*merge_tree); + input->readPrefix(); + output.writePrefix(); + while (true) + { + Block block = input->read(); + if (!block || block.rows() == 0) + break; + output.write(block); + } + output.writeSuffix(); + input->readSuffix(); + } + + // remove data in region + { + auto & kvstore = context.getTMTContext().kvstore; + auto region = kvstore->getRegion(region_id); + if (!region) + return; + auto scanner = region->createCommittedScanRemover(table_id); + for (const auto & key : keys_to_remove) + scanner->remove(key); + rest_cache_size = region->dataSize(); + LOG_TRACE(log, "region " << region_id << " data size after flush " << rest_cache_size); + } +} + +// ============================================================= +// Public member functions. +// ============================================================= + +static const Int64 FTH_BYTES_1 = 1024; // 1 KB +static const Int64 FTH_BYTES_2 = 1024 * 1024; // 1 MB +static const Int64 FTH_BYTES_3 = 1024 * 1024 * 10; // 10 MBs +static const Int64 FTH_BYTES_4 = 1024 * 1024 * 50; // 50 MBs + +static const Seconds FTH_PERIOD_1(60 * 60); // 1 hour +static const Seconds FTH_PERIOD_2(60 * 5); // 5 minutes +static const Seconds FTH_PERIOD_3(60); // 1 minute +static const Seconds FTH_PERIOD_4(5); // 5 seconds + +RegionTable::RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher) + : parent_path(parent_path_), + flush_thresholds{{FTH_BYTES_1, FTH_PERIOD_1}, {FTH_BYTES_2, FTH_PERIOD_2}, {FTH_BYTES_3, FTH_PERIOD_3}, {FTH_BYTES_4, FTH_PERIOD_4}}, + context(context_), + log(&Logger::get("RegionTable")) +{ + Poco::File dir(parent_path + "tables/"); + if (!dir.exists()) + dir.createDirectories(); + + std::vector file_names; + dir.list(file_names); + + // Restore all table mappings and region mappings. + for (auto & name : file_names) + { + TableID table_id = std::stoull(name); + auto p = tables.try_emplace(table_id, parent_path + "tables/", table_id); + Table & table = p.first->second; + + for (auto it = table.regions.get().begin(); it != table.regions.get().end();) + { + auto region_id = it->first; + auto & region = it->second; + auto region_ptr = region_fetcher(region_id); + if (!region_ptr) + { + // It could happen that process crash after region split or region snapshot apply, + // and region has not been persisted, but region <-> partition mapping does. + it = table.regions.get().erase(it); + LOG_WARNING(log, "Region " << region_id << " not found from KVStore, dropped."); + continue; + } + else + ++it; + + region.cache_bytes = region_ptr->dataSize(); + region.updated = true; + + // Update region_id -> table_id + { + auto it = regions.find(region_id); + if (it == regions.end()) + std::tie(it, std::ignore) = regions.try_emplace(region_id); + RegionInfo & region_info = it->second; + region_info.tables.emplace(table_id); + } + } + + table.regions.persist(); + } +} + +void RegionTable::updateRegion(const RegionPtr & region, const TableIDSet & relative_table_ids) +{ + std::lock_guard lock(mutex); + + auto region_id = region->id(); + size_t cache_bytes = region->dataSize(); + for (auto table_id : relative_table_ids) + { + auto & region = getOrInsertRegion(table_id, region_id); + region.updated = true; + region.cache_bytes = cache_bytes; + } +} + +void RegionTable::applySnapshotRegion(const RegionPtr & region) +{ + std::lock_guard lock(mutex); + + auto region_id = region->id(); + auto table_ids = getRegionTableIds(region); + for (auto table_id : table_ids) + { + auto & internal_region = getOrInsertRegion(table_id, region_id); + internal_region.must_flush = true; + internal_region.cache_bytes = region->dataSize(); + } +} + +void RegionTable::splitRegion(const RegionPtr & region, std::vector split_regions) +{ + std::lock_guard lock(mutex); + + auto region_id = region->id(); + auto it = regions.find(region_id); + + if (it == regions.end()) + { + // If region doesn't exist, usually means it does not contain any data we interested. Just ignore it. + return; + } + + RegionInfo & region_info = it->second; + auto & tmt_ctx = context.getTMTContext(); + for (auto table_id : region_info.tables) + { + auto storage = tmt_ctx.storages.get(table_id); + if (storage == nullptr) + { + throw Exception("Table " + DB::toString(table_id) + " not found", ErrorCodes::UNKNOWN_TABLE); + } + + auto & table = getOrCreateTable(table_id); + + for (const RegionPtr & split_region : split_regions) + { + const auto range = split_region->getRange(); + if (!TiKVRange::checkTableInvolveRange(table_id, range)) + continue; + + auto split_region_id = split_region->id(); + + auto & region = insertRegion(table, split_region_id); + region.must_flush = true; + } + } + + updateRegionRange(region); +} + +void RegionTable::removeRegion(const RegionPtr & region) +{ + std::unordered_set tables; + { + auto region_id = region->id(); + + std::lock_guard lock(mutex); + + auto r_it = regions.find(region_id); + if (r_it == regions.end()) + { + LOG_WARNING(log, "Being removed region " << region_id << " does not exist."); + return; + } + RegionInfo & region_info = r_it->second; + tables.swap(region_info.tables); + + regions.erase(region_id); + + for (auto table_id : tables) + { + auto & table = getOrCreateTable(table_id); + table.regions.get().erase(region_id); + table.regions.persist(); + } + } + + auto & tmt_ctx = context.getTMTContext(); + for (auto table_id : tables) + { + auto storage = tmt_ctx.storages.get(table_id); + if (storage == nullptr) + { + LOG_WARNING(log, "RegionTable::removeRegion: " << table_id << " does not exist."); + continue; + } + auto * merge_tree = dynamic_cast(storage.get()); + auto [start_handle, end_handle] = region->getHandleRangeByTable(table_id); + deleteRange(context, merge_tree, start_handle, end_handle); + } +} + +bool RegionTable::tryFlushRegions() +{ + std::map, size_t> to_flush; + { + traverseRegions([&](TableID table_id, InternalRegion & region) { + if (shouldFlush(region)) + { + to_flush.insert_or_assign({table_id, region.region_id}, region.cache_bytes); + // Stop other flush threads. + region.pause_flush = true; + } + }); + } + + for (auto && [id, data] : to_flush) + { + flushRegion(id.first, id.second, data); + } + + { + // Now reset status infomations. + Timepoint now = Clock::now(); + traverseRegions([&](TableID table_id, InternalRegion & region) { + if (auto it = to_flush.find({table_id, region.region_id}); it != to_flush.end()) + { + region.pause_flush = false; + region.must_flush = false; + region.updated = false; + region.cache_bytes = it->second; + region.last_flush_time = now; + } + }); + } + + return !to_flush.empty(); +} + +void RegionTable::traverseRegions(std::function callback) +{ + std::lock_guard lock(mutex); + for (auto && [table_id, table] : tables) + { + for (auto & region_info : table.regions.get()) + { + callback(table_id, region_info.second); + } + } +} + +void RegionTable::traverseRegionsByTable(const TableID table_id, std::function callback) +{ + auto & kvstore = context.getTMTContext().kvstore; + Regions regions; + { + std::lock_guard lock(mutex); + auto & table = getOrCreateTable(table_id); + + for (const auto & region_info : table.regions.get()) + { + auto region = kvstore->getRegion(region_info.second.region_id); + if (!region) + throw Exception("Region " + DB::toString(region_info.second.region_id) + " not found!", ErrorCodes::LOGICAL_ERROR); + regions.push_back(region); + } + } + callback(regions); +} + +void RegionTable::dumpRegionMap(RegionTable::RegionMap & res) +{ + std::lock_guard lock(mutex); + res = regions; +} + +void RegionTable::dropRegionsInTable(TableID /*table_id*/) +{ + // TODO: impl +} + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h new file mode 100644 index 00000000000..9352e117b73 --- /dev/null +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -0,0 +1,165 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace DB +{ + +class RegionTable : private boost::noncopyable +{ +public: + struct InternalRegion + { + InternalRegion() {} + InternalRegion(const InternalRegion & p) : region_id(p.region_id) {} + InternalRegion(RegionID region_id_) : region_id(region_id_) {} + + RegionID region_id; + bool pause_flush = false; + bool must_flush = false; + bool updated = false; + Int64 cache_bytes = 0; + Timepoint last_flush_time = Clock::now(); + }; + + struct Table + { + Table(const std::string & parent_path, TableID table_id_) : table_id(table_id_), regions(parent_path + DB::toString(table_id)) + { + regions.restore(); + } + + struct Write + { + void operator()(const RegionID k, const InternalRegion & v, DB::WriteBuffer & buf) + { + std::ignore = v; + writeIntBinary(k, buf); + } + }; + + struct Read + { + std::pair operator()(DB::ReadBuffer & buf) + { + RegionID region_id; + readIntBinary(region_id, buf); + return {region_id, InternalRegion(region_id)}; + } + }; + + using InternalRegions = PersistedContainerMap; + + TableID table_id; + InternalRegions regions; + }; + + struct RegionInfo + { + std::unordered_set tables; + }; + + enum RegionReadStatus : UInt8 + { + OK, + NOT_FOUND, + VERSION_ERROR, + PENDING_REMOVE, + }; + + static const String RegionReadStatusString(RegionReadStatus s) + { + switch (s) + { + case OK: + return "OK"; + case NOT_FOUND: + return "NOT_FOUND"; + case VERSION_ERROR: + return "VERSION_ERROR"; + case PENDING_REMOVE: + return "PENDING_REMOVE"; + } + return "Unknown"; + }; + + using TableMap = std::unordered_map; + using RegionMap = std::unordered_map; + using FlushThresholds = std::vector>; + +private: + const std::string parent_path; + + TableMap tables; + RegionMap regions; + + FlushThresholds flush_thresholds; + + Context & context; + + mutable std::mutex mutex; + Logger * log; + +private: + Table & getOrCreateTable(TableID table_id); + InternalRegion & insertRegion(Table & table, RegionID region_id); + InternalRegion & getOrInsertRegion(TableID table_id, RegionID region_id); + + /// This functional only shrink the table range of this region_id, range expand will (only) be done at flush. + /// Note that region update range should not affect the data in storage. + void updateRegionRange(const RegionPtr & region); + + bool shouldFlush(const InternalRegion & region); + void flushRegion(TableID table_id, RegionID partition_id, size_t & rest_cache_size); + +public: + RegionTable(Context & context_, const std::string & parent_path_, std::function region_fetcher); + void setFlushThresholds(FlushThresholds flush_thresholds_) { flush_thresholds = std::move(flush_thresholds_); } + + /// After the region is updated (insert or delete KVs). + void updateRegion(const RegionPtr & region, const TableIDSet & relative_table_ids); + /// A new region arrived by apply snapshot command, this function store the region into selected partitions. + void applySnapshotRegion(const RegionPtr & region); + /// Manage data after region split into split_regions. + /// i.e. split_regions could have assigned to another partitions, we need to move the data belong with them. + void splitRegion(const RegionPtr & region, std::vector split_regions); + /// Remove a region from corresponding partitions. + void removeRegion(const RegionPtr & region); + + /// Try pick some regions and flush. + /// Note that flush is organized by partition. i.e. if a regions is selected to be flushed, all regions belong to its partition will also flushed. + /// This function will be called constantly by background threads. + /// Returns whether this function has done any meaningful job. + bool tryFlushRegions(); + + void traverseRegions(std::function callback); + void traverseRegionsByTable(const TableID table_id, std::function callback); + + std::tuple getBlockInputStreamByRegion(TableID table_id, + const RegionID region_id, + const RegionVersion region_version, + const TiDB::TableInfo & table_info, + const ColumnsDescription & columns, + const Names & ordered_columns, + bool learner_read, + bool resolve_locks, + UInt64 start_ts, + std::vector * keys = nullptr); + + // For debug + void dumpRegionMap(RegionTable::RegionMap & res); + void dropRegionsInTable(TableID table_id); +}; + +using RegionPartitionPtr = std::shared_ptr; + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 749a7a04f9b..a3e00a77c81 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -7,13 +7,17 @@ namespace DB { TMTContext::TMTContext(Context & context, std::vector addrs) - : kvstore(std::make_shared(context.getPath() + "kvstore/", &context)), - region_partition(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)), + : kvstore(std::make_shared(context.getPath() + "kvstore/", &context, ®ions_to_remove)), + region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)), schema_syncer(std::make_shared()), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) : static_cast(new pingcap::pd::Client(addrs))), region_cache(std::make_shared(pd_client)) -{} +{ + for (RegionID id : regions_to_remove) + kvstore->removeRegion(id, &context); + regions_to_remove.clear(); +} SchemaSyncerPtr TMTContext::getSchemaSyncer() const { diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index b4bb7418675..dd6a847d3c7 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -4,7 +4,7 @@ #include #include -#include +#include #include #include @@ -18,7 +18,7 @@ class TMTContext public: KVStorePtr kvstore; TMTStorages storages; - RegionPartition region_partition; + RegionTable region_table; public: // TODO: get flusher args from config file @@ -35,6 +35,8 @@ class TMTContext pingcap::kv::RpcClientPtr getRpcClient(); private: + std::vector regions_to_remove; + SchemaSyncerPtr schema_syncer; pingcap::pd::ClientPtr pd_client; pingcap::kv::RegionCachePtr region_cache; diff --git a/dbms/src/Storages/Transaction/TiKVKeyValue.h b/dbms/src/Storages/Transaction/TiKVKeyValue.h index 5e701d87a49..cdabe102cd6 100644 --- a/dbms/src/Storages/Transaction/TiKVKeyValue.h +++ b/dbms/src/Storages/Transaction/TiKVKeyValue.h @@ -39,6 +39,18 @@ struct StringObject StringObject() = default; explicit StringObject(std::string && str_) : str(std::move(str_)) {} explicit StringObject(const std::string & str_) : str(str_) {} + StringObject(StringObject && obj) : str(std::move(obj.str)) {} + StringObject(const StringObject & obj) : str(obj.str) {} + StringObject & operator=(const StringObject & a) + { + str = a.str; + return *this; + } + StringObject & operator=(StringObject && a) + { + str = std::move(a.str); + return *this; + } const std::string & getStr() const { return str; } std::string & getStrRef() { return str; } @@ -69,7 +81,7 @@ struct StringObject size_t serialize(WriteBuffer & buf) const { return writeBinary2(str, buf); } - static T deserialize(ReadBuffer & buf) { return T {readBinary2(buf)}; } + static T deserialize(ReadBuffer & buf) { return T(readBinary2(buf)); } private: std::string str; diff --git a/dbms/src/Storages/Transaction/Types.h b/dbms/src/Storages/Transaction/Types.h index f313abf3add..67588aa44db 100644 --- a/dbms/src/Storages/Transaction/Types.h +++ b/dbms/src/Storages/Transaction/Types.h @@ -40,6 +40,13 @@ enum : RegionID InvalidRegionID = 0 }; +using RegionVersion = UInt64; + +enum : RegionVersion +{ + InvalidRegionVersion = std::numeric_limits::max() +}; + using Clock = std::chrono::system_clock; using Timepoint = Clock::time_point; using Duration = Clock::duration; diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index fb3e4b03e94..de7bb419f5a 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -37,16 +37,28 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) if (!request.has_data()) throw Exception("Failed to read snapshot data", ErrorCodes::LOGICAL_ERROR); const auto & data = request.data(); - data.cf(); - for (const auto & kv : data.data()) - region->insert(data.cf(), TiKVKey{kv.key()}, TiKVValue{kv.value()}); + + { + auto cf_data = data.data(); + auto it = cf_data.begin(); + auto cf_name = data.cf(); + auto key = TiKVKey(); + auto value = TiKVValue(); + region->batchInsert([&](Region::BatchInsertNode & node) -> bool { + if (it == cf_data.end()) + return false; + key = TiKVKey(it->key()); + value = TiKVValue(it->value()); + node = Region::BatchInsertNode(&key, &value, &cf_name); + ++it; + return true; + }); + + } } // context may be null in test cases. - if (context) - kvstore->onSnapshot(region, context); - else - kvstore->onSnapshot(region, nullptr); + kvstore->onSnapshot(region, context); LOG_INFO(log, "Region " << region->id() << " apply snapshot done."); } diff --git a/dbms/src/Storages/Transaction/tests/kvstore.cpp b/dbms/src/Storages/Transaction/tests/kvstore.cpp index cdd1096034f..80dc1ca3e65 100644 --- a/dbms/src/Storages/Transaction/tests/kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/kvstore.cpp @@ -257,12 +257,12 @@ int main(int, char **) } { - kvstore->tryPersistAndReport(context); + kvstore->tryPersistAndReport(context, Seconds(0), Seconds(0)); { auto kvstore2 = std::make_shared(dir_path); - const auto & regions1 = kvstore->_regions(); - const auto & regions2 = kvstore2->_regions(); + const auto & regions1 = kvstore->getRegions(); + const auto & regions2 = kvstore2->getRegions(); for (auto && [region_id1, region1] : regions1) { auto it = regions2.find(region_id1); @@ -299,9 +299,9 @@ int main(int, char **) kvstore->onServiceCommand(cmds, context); - ASSERT_CHECK_EQUAL(1, kvstore->_regions().size(), suc); + ASSERT_CHECK_EQUAL(1, kvstore->getRegions().size(), suc); - kvstore->tryPersistAndReport(context); + kvstore->tryPersistAndReport(context, Seconds(0), Seconds(0)); } return suc ? 0 : 1;