Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7529
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Lloyd-Pottiger authored and ti-chi-bot committed May 23, 2023
1 parent 0b00195 commit fcae41b
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ TiDBTableScan::TiDBTableScan(
, keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order()))
, is_fast_scan(table_scan->tbl_scan().is_fast_scan())
{
RUNTIME_CHECK_MSG(!keep_order || pushed_down_filters.empty(), "Bad TiDB table scan executor: push down filter is not empty when keep order is true");

if (is_partition_table_scan)
{
if (table_scan->partition_table_scan().has_table_id())
Expand Down Expand Up @@ -75,6 +77,7 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
for (auto id : partition_table_scan.primary_prefix_column_ids())
tipb_table_scan->add_primary_prefix_column_ids(id);
tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan());
tipb_table_scan->set_keep_order(false);
}
else
{
Expand Down
100 changes: 100 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,107 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index,
<<<<<<< HEAD
const ScanContextPtr & scan_context)
=======
ScanContextPtr scan_context)
{
// Use the id from MPP/Coprocessor level as tracing_id
auto dm_context = newDMContext(db_context, db_settings, tracing_id, scan_context);

// If keep order is required, disable read thread.
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
// SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled.
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments, /*try_split_task =*/!enable_read_thread);
auto log_tracing_id = getLogTracingId(*dm_context);
auto tracing_logger = log->getChild(log_tracing_id);
LOG_INFO(tracing_logger,
"Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} is_fast_scan={} is_push_down_filter_empty={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr);

auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) {
// TODO: Update the tracing_id before checkSegmentUpdate?
this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read);
};

GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size());
size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
extra_table_id_index,
dm_context,
columns_to_read,
filter,
max_version,
expected_block_size,
read_mode,
std::move(tasks),
after_segment_read,
log_tracing_id,
enable_read_thread,
final_num_stream);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
{
BlockInputStreamPtr stream;
if (enable_read_thread)
{
stream = std::make_shared<UnorderedInputStream>(
read_task_pool,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
extra_table_id_index,
log_tracing_id);
}
else
{
stream = std::make_shared<DMSegmentThreadInputStream>(
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
filter,
max_version,
expected_block_size,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
log_tracing_id);
stream = std::make_shared<AddExtraTableIDColumnInputStream>(
stream,
extra_table_id_index,
physical_table_id);
}
res.push_back(stream);
}
LOG_DEBUG(tracing_logger, "Read create stream done");

return res;
}

void DeltaMergeStore::read(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
const PushDownFilterPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_scan,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index,
ScanContextPtr scan_context)
>>>>>>> efeb0c122f (fix keep order in table scan is lost when remote read of PartitionTableScan (#7529))
{
// Use the id from MPP/Coprocessor level as tracing_id
auto dm_context = newDMContext(db_context, db_settings, tracing_id, scan_context);
Expand Down
156 changes: 156 additions & 0 deletions dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnsCommon.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/LateMaterializationBlockInputStream.h>
#include <Storages/DeltaMerge/ReadUtil.h>

#include <algorithm>

namespace DB::DM
{

LateMaterializationBlockInputStream::LateMaterializationBlockInputStream(
const ColumnDefines & columns_to_read,
const String & filter_column_name_,
BlockInputStreamPtr filter_column_stream_,
SkippableBlockInputStreamPtr rest_column_stream_,
const BitmapFilterPtr & bitmap_filter_,
const String & req_id_)
: header(toEmptyBlock(columns_to_read))
, filter_column_name(filter_column_name_)
, filter_column_stream(filter_column_stream_)
, rest_column_stream(rest_column_stream_)
, bitmap_filter(bitmap_filter_)
, log(Logger::get(NAME, req_id_))
{}

Block LateMaterializationBlockInputStream::readImpl()
{
Block filter_column_block;
FilterPtr filter = nullptr;

// Until non-empty block after filtering or end of stream.
while (true)
{
filter_column_block = filter_column_stream->read(filter, true);

// If filter_column_block is empty, it means that the stream has ended.
// No need to read the rest_column_stream, just return an empty block.
if (!filter_column_block)
return filter_column_block;

// If filter is nullptr, it means that these push down filters are always true.
if (!filter)
{
IColumn::Filter col_filter;
col_filter.resize(filter_column_block.rows());
Block rest_column_block;
if (bitmap_filter->get(col_filter, filter_column_block.startOffset(), filter_column_block.rows()))
{
rest_column_block = rest_column_stream->read();
}
else
{
rest_column_block = rest_column_stream->read();
size_t passed_count = countBytesInFilter(col_filter);
for (auto & col : rest_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
for (auto & col : filter_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
}
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}

size_t rows = filter_column_block.rows();
// bitmap_filter[start_offset, start_offset + rows] & filter -> filter
bitmap_filter->rangeAnd(*filter, filter_column_block.startOffset(), rows);

if (size_t passed_count = countBytesInFilter(*filter); passed_count == 0)
{
// if all rows are filtered, skip the next block of rest_column_stream
if (size_t skipped_rows = rest_column_stream->skipNextBlock(); skipped_rows == 0)
{
// if we fail to skip, we need to call read() of rest_column_stream, but ignore the result
// NOTE: skipNextBlock() return 0 only if failed to skip or meets the end of stream,
// but the filter_column_stream doesn't meet the end of stream
// so it is an unexpected behavior.
rest_column_stream->read();
LOG_ERROR(log, "Late materialization skip block failed, at start_offset: {}, rows: {}", filter_column_block.startOffset(), filter_column_block.rows());
}
}
else
{
Block rest_column_block;
auto filter_out_count = rows - passed_count;
if (filter_out_count >= DEFAULT_MERGE_BLOCK_SIZE * 2)
{
// When DEFAULT_MERGE_BLOCK_SIZE < row_left < DEFAULT_MERGE_BLOCK_SIZE * 2,
// the possibility of skipping a pack in the next block is quite small, less than 1%.
// And the performance read and then filter is is better than readWithFilter,
// so only if the number of rows left after filtering out is large enough,
// we can skip some packs of the next block, call readWithFilter to get the next block.
rest_column_block = rest_column_stream->readWithFilter(*filter);
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
}
else if (filter_out_count > 0)
{
// if the number of rows left after filtering out is small, we can't skip any packs of the next block
// so we call read() to get the next block, and then filter it.
rest_column_block = rest_column_stream->read();
for (auto & col : rest_column_block)
{
col.column = col.column->filter(*filter, passed_count);
}
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
}
else
{
// if all rows are passed, just read the next block of rest_column_stream
rest_column_block = rest_column_stream->read();
}

// make sure the position and size of filter_column_block and rest_column_block are the same
RUNTIME_CHECK_MSG(rest_column_block.startOffset() == filter_column_block.startOffset(),
"Late materialization meets unexpected block unmatched, filter_column_block: [start_offset={}, rows={}], rest_column_block: [start_offset={}, rows={}], pass_count={}",
filter_column_block.startOffset(),
filter_column_block.rows(),
rest_column_block.startOffset(),
rest_column_block.rows(),
passed_count);
// join filter_column_block and rest_column_block by columns,
// the tmp column added by FilterBlockInputStream will be removed.
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}
}

return filter_column_block;
}

} // namespace DB::DM
88 changes: 88 additions & 0 deletions tests/fullstack-test/issues/issue_7519.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Preparation.
=> DBGInvoke __init_fail_point()

mysql> drop table if exists test.t
mysql> create table test.t (x int, a varchar(50), y int, t time) partition by range (x) (partition p0 values less than (5), partition p1 values less than (10));

mysql> insert into test.t values (1, 'a', 1, '700:11:11.1234'), (2, 'b', 2, '711:12:12.1234');
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t select * from test.t;
mysql> insert into test.t values (8, 'c', 8, '500:21:21.1234');

mysql> alter table test.t set tiflash replica 1;
func> wait_table test t
mysql> analyze table test.t;

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select count(*) from test.t;
+----------+
| count(*) |
+----------+
| 16385 |
+----------+

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select * from test.t where x >= 5 and x < 10;
+------+------+------+-----------+
| x | a | y | t |
+------+------+------+-----------+
| 8 | c | 8 | 500:21:21 |
+------+------+------+-----------+

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select x, a, y, hour(t) from test.t where x >= 5 and x < 10;
+------+------+------+---------+
| x | a | y | hour(t) |
+------+------+------+---------+
| 8 | c | 8 | 500 |
+------+------+------+---------+

=> DBGInvoke __enable_fail_point(force_remote_read_for_batch_cop)

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select count(*) from test.t;
+----------+
| count(*) |
+----------+
| 16385 |
+----------+

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select * from test.t where x >= 5 and x < 10;
+------+------+------+-----------+
| x | a | y | t |
+------+------+------+-----------+
| 8 | c | 8 | 500:21:21 |
+------+------+------+-----------+

mysql> set tidb_partition_prune_mode=dynamic; set tidb_enforce_mpp=1; select x, a, y, hour(t) from test.t where x >= 5 and x < 10;
+------+------+------+---------+
| x | a | y | hour(t) |
+------+------+------+---------+
| 8 | c | 8 | 500 |
+------+------+------+---------+

=> DBGInvoke __disable_fail_point(force_remote_read_for_batch_cop)

# Clean up.
mysql> drop table if exists test.t;

0 comments on commit fcae41b

Please sign in to comment.