Skip to content

Commit

Permalink
FLASH-554 cop check range should be based on region range (#270)
Browse files Browse the repository at this point in the history
* only return execute summaies if requested

* cop check range should be based on region range

* address comments

* add tests

* minor improve
  • Loading branch information
windtalker authored and zanmato1984 committed Oct 10, 2019
1 parent 80f6f35 commit f255362
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 19 deletions.
25 changes: 20 additions & 5 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts,
Int64 tz_offset, const String & tz_name);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version);
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
Expand Down Expand Up @@ -86,7 +87,14 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
if (!region)
throw Exception("No such region", ErrorCodes::BAD_ARGUMENTS);
}
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region->id(), region->version(), region->confVer());

auto handle_range = region->getHandleRangeByTable(table_id);
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> key_ranges;
DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id);
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region->id(), region->version(),
region->confVer(), key_ranges);

return outputDAGResponse(context, schema, dag_response);
}
Expand Down Expand Up @@ -119,7 +127,13 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer());
auto handle_range = region->getHandleRangeByTable(table_id);
std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> key_ranges;
DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id);
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(),
region->confVer(), key_ranges);

return outputDAGResponse(context, schema, dag_response);
}
Expand Down Expand Up @@ -562,13 +576,14 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
}

tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version)
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges)
{
static Logger * log = &Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
context.setSetting("dag_planner", "optree");
tipb::SelectResponse dag_response;
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, {}, dag_response, true);
DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, std::move(key_ranges), dag_response, true);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
return dag_response;
Expand Down
64 changes: 50 additions & 14 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
{}

template <typename HandleType>
bool isAllValueCoveredByRanges(std::vector<HandleRange<HandleType>> & ranges)
bool isAllValueCoveredByRanges(std::vector<HandleRange<HandleType>> & ranges, const std::vector<HandleRange<HandleType>> & region_ranges)
{
if (ranges.empty())
return false;
std::sort(ranges.begin(), ranges.end(),
[](const HandleRange<HandleType> & a, const HandleRange<HandleType> & b) { return a.first < b.first; });

std::vector<HandleRange<HandleType>> merged_ranges;
HandleRange<HandleType> merged_range;
merged_range.first = ranges[0].first;
merged_range.second = ranges[0].second;
Expand All @@ -63,41 +64,77 @@ bool isAllValueCoveredByRanges(std::vector<HandleRange<HandleType>> & ranges)
if (merged_range.second >= ranges[i].first)
merged_range.second = merged_range.second >= ranges[i].second ? merged_range.second : ranges[i].second;
else
break;
{
merged_ranges.emplace_back(std::make_pair(merged_range.first, merged_range.second));
merged_range.first = ranges[i].first;
merged_range.second = ranges[i].second;
}
}
merged_ranges.emplace_back(std::make_pair(merged_range.first, merged_range.second));

return merged_range.first == TiKVHandle::Handle<HandleType>::normal_min && merged_range.second == TiKVHandle::Handle<HandleType>::max;
for (const auto & region_range : region_ranges)
{
bool covered = false;
for (const auto & range : merged_ranges)
{
if (region_range.first >= range.first && region_range.second <= range.second)
{
covered = true;
break;
}
}
if (!covered && region_range.second > region_range.first)
return false;
}
return true;
}

bool checkKeyRanges(const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges, TableID table_id, bool pk_is_uint64)
bool checkKeyRanges(const std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges, TableID table_id, bool pk_is_uint64,
const ImutRegionRangePtr & region_key_range)
{
if (key_ranges.empty())
return true;

std::vector<HandleRange<Int64>> scan_ranges;
std::vector<HandleRange<Int64>> handle_ranges;
for (auto & range : key_ranges)
{
TiKVRange::Handle start = TiKVRange::getRangeHandle<true>(range.first, table_id);
TiKVRange::Handle end = TiKVRange::getRangeHandle<false>(range.second, table_id);
scan_ranges.emplace_back(std::make_pair(start, end));
handle_ranges.emplace_back(std::make_pair(start, end));
}

std::vector<HandleRange<Int64>> region_handle_ranges;
auto & raw_keys = region_key_range->rawKeys();
TiKVRange::Handle region_start = TiKVRange::getRangeHandle<true>(raw_keys.first, table_id);
TiKVRange::Handle region_end = TiKVRange::getRangeHandle<false>(raw_keys.second, table_id);
region_handle_ranges.emplace_back(std::make_pair(region_start, region_end));

if (pk_is_uint64)
{
std::vector<HandleRange<UInt64>> update_ranges;
for (auto & range : scan_ranges)
std::vector<HandleRange<UInt64>> update_handle_ranges;
for (auto & range : handle_ranges)
{
const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(range);

for (int i = 0; i < n; i++)
{
update_ranges.emplace_back(new_range[i]);
update_handle_ranges.emplace_back(new_range[i]);
}
}
return isAllValueCoveredByRanges<UInt64>(update_ranges);
std::vector<HandleRange<UInt64>> update_region_handle_ranges;
for (auto & range : region_handle_ranges)
{
const auto [n, new_range] = CHTableHandle::splitForUInt64TableHandle(range);

for (int i = 0; i < n; i++)
{
update_region_handle_ranges.emplace_back(new_range[i]);
}
}
return isAllValueCoveredByRanges<UInt64>(update_handle_ranges, update_region_handle_ranges);
}
else
return isAllValueCoveredByRanges<Int64>(scan_ranges);
return isAllValueCoveredByRanges<Int64>(handle_ranges, region_handle_ranges);
}
// the flow is the same as executeFetchcolumns
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
Expand Down Expand Up @@ -206,9 +243,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
max_streams *= settings.max_streams_to_max_threads_ratio;
}

if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);

if (dag.hasSelection())
{
for (auto & condition : dag.getSelection().conditions())
Expand All @@ -235,6 +269,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
region_ids.push_back(info.region_id);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
}
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64(), current_region->getRange()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
Expand Down
24 changes: 24 additions & 0 deletions tests/mutable-test/txn_dag/key_range.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test_uint)
=> drop table if exists default.test_uint

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test_uint, 'col_1 String, col_2 UInt64','col_2')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test_uint)
=> DBGInvoke __raft_insert_row(default, test_uint, 4, 88, 'test1')
=> DBGInvoke __raft_insert_row(default, test_uint, 4, 99, 'test2')

=> DBGInvoke dag('select * from default.test_uint where col_2 >= 66')
┌─col_1─┬─col_2─┐
│ test1 │ 88 │
│ test2 │ 99 │
└───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test_uint)
=> drop table if exists default.test_uint

0 comments on commit f255362

Please sign in to comment.