Skip to content

Commit

Permalink
[FLASH-541] Read ranges indicated by mvcc_query_info (#267)
Browse files Browse the repository at this point in the history
* Read ranges indicated by mvcc_query_info

* Optimization: Don't do handle range filter inside Segment
  • Loading branch information
JaySon-Huang committed Oct 22, 2019
1 parent ba31e32 commit afddd22
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 234 deletions.
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TiKVRange.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/RegionRangeKeys.h>

#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
Expand Down Expand Up @@ -793,6 +794,14 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
const auto & epoch = region.region_epoch();
info.version = epoch.version();
info.conf_version = epoch.conf_ver();
if (const auto & managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage))
{
// Extract the handle range according to current table
TiKVKey start_key = RecordKVFormat::encodeAsTiKVKey(region.start_key());
TiKVKey end_key = RecordKVFormat::encodeAsTiKVKey(region.end_key());
RegionRangeKeys region_range(std::move(start_key), std::move(end_key));
info.range_in_table = region_range.getHandleRangeByTable(managed_storage->getTableInfo().id);
}
query_info.mvcc_query_info->regions_query_info.push_back(info);
}

Expand Down
147 changes: 18 additions & 129 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,163 +10,52 @@ namespace DB
{
namespace DM
{
/// Read `chunks` as blocks.
/// We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly.
///
/// Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}.
///
/// For example:
/// size_t skip_rows = 0;
/// while(stream.hasNext())
/// {
/// if(stream.shouldSkipNext())
/// {
/// skip_rows += stream.nextRows();
/// stream.skipNext();
/// continue;
/// }
/// auto block = stream.read();
/// ...
/// }
class ChunkBlockInputStream final : public IBlockInputStream
{
public:
ChunkBlockInputStream(const Chunks & chunks_,
size_t handle_col_pos_,
const HandleRange & handle_range_,
const ColumnDefines & read_columns_,
const PageReader & page_reader_,
const RSOperatorPtr & filter_)
: chunks(chunks_),
handle_col_pos(handle_col_pos_),
handle_range(handle_range_),
read_columns(read_columns_),
page_reader(page_reader_),
filter(filter_)
const RSOperatorPtr & filter)
: chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
{
}

String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

Block read() override
{
if (!hasNext())
return {};
Block tmp;
if (!cur_chunk_data)
// It means user ignore the skipNext() result and insist to read data.
tmp = readCurChunkData();
else
tmp.swap(cur_chunk_data);

++cur_chunk_index;
cur_chunk_skip = false;

return tmp;
}

bool hasNext()
{
if (cur_chunk_index >= chunks.size())
return false;
// Filter out those rows not fit for handle_range.
for (; cur_chunk_index < chunks.size(); ++cur_chunk_index)
{
auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast();
if (handle_range.intersect(first, last))
break;
}

if (cur_chunk_index >= chunks.size())
return false;

if (!cur_chunk_data)
if (filter)
{
if (filter)
for (size_t i = 0; i < chunks.size(); ++i)
{
auto & chunk = chunks[cur_chunk_index];
auto & chunk = chunks[i];
RSCheckParam param;
for (auto & [col_id, meta] : chunk.getMetas())
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));

cur_chunk_skip = filter->roughCheck(param) == None;
}
if (!cur_chunk_skip)
{
cur_chunk_data = readCurChunkData();
skip_chunks[i] = filter->roughCheck(param) == None;
}
}

return true;
}

size_t nextRows()
{
auto & chunk = chunks[cur_chunk_index];
if (isCurChunkCompleted(chunk))
return chunk.getRows();

// Otherwise, some rows of current chunk are filtered out by handle_range.

if (cur_chunk_data)
{
return cur_chunk_data.rows();
}
else
{
// Current chunk is ignored by `filter`,
// but we still need to get the row count which their handles are included by handle_range.
auto block = readChunk(chunk, {read_columns[handle_col_pos]}, page_reader);
auto offset_limit
= HandleFilter::getPosRangeOfSorted(handle_range, block.getByPosition(handle_col_pos).column, 0, block.rows());
return offset_limit.second;
}
}

bool shouldSkipNext() { return cur_chunk_skip; }
String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

void skipNext()
Block read() override
{
++cur_chunk_index;

cur_chunk_data = {};
cur_chunk_skip = false;
if (!hasNext())
return {};
return readChunk(chunks[cur_chunk_index++], read_columns, page_reader);
}

private:
inline bool isCurChunkCompleted(const Chunk & chunk)
{
auto [first, last] = chunk.getHandleFirstLast();
return handle_range.include(first, last);
}
bool hasNext() { return cur_chunk_index < chunks.size(); }
size_t nextRows() { return chunks[cur_chunk_index].getRows(); }

inline Block readCurChunkData()
{
auto & chunk = chunks[cur_chunk_index];
if (isCurChunkCompleted(chunk))
{
return readChunk(chunk, read_columns, page_reader);
}
else
{
auto block = readChunk(chunk, read_columns, page_reader);
return HandleFilter::filterSorted(handle_range, std::move(block), handle_col_pos);
}
}
bool shouldSkipNext() { return skip_chunks[cur_chunk_index]; }
void skipNext() { ++cur_chunk_index; }

private:
Chunks chunks;
size_t handle_col_pos;
HandleRange handle_range;
Chunks chunks;
std::vector<UInt8> skip_chunks;

ColumnDefines read_columns;
PageReader page_reader;
RSOperatorPtr filter;

size_t cur_chunk_index = 0;
bool cur_chunk_skip = false;
Block cur_chunk_data;
};

using ChunkBlockInputStreamPtr = std::shared_ptr<ChunkBlockInputStream>;
Expand Down
26 changes: 5 additions & 21 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
using DeltaValueSpacePtr = std::shared_ptr<DeltaValueSpace>;
using SharedLock = std::shared_lock<std::shared_mutex>;

size_t handle_column_pos;
HandleRange handle_range;

ChunkBlockInputStreamPtr stable_input_stream;
ChunkBlockInputStream * stable_input_stream_raw_ptr;

Expand Down Expand Up @@ -59,16 +56,12 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
bool delta_done = false;

public:
DeltaMergeBlockInputStream(size_t handle_column_pos_,
const HandleRange & handle_range_,
const ChunkBlockInputStreamPtr & stable_input_stream_,
DeltaMergeBlockInputStream(const ChunkBlockInputStreamPtr & stable_input_stream_,
const DeltaValueSpacePtr & delta_value_space_,
IndexIterator index_begin,
IndexIterator index_end,
size_t max_block_size_)
: handle_column_pos(handle_column_pos_),
handle_range(handle_range_),
stable_input_stream(stable_input_stream_),
: stable_input_stream(stable_input_stream_),
stable_input_stream_raw_ptr(stable_input_stream.get()),
delta_value_space(delta_value_space_),
entry_it(index_begin),
Expand Down Expand Up @@ -125,13 +118,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
if (limit == max_block_size)
continue;

Block block = header.cloneWithColumns(std::move(columns));

Block res = HandleFilter::filterSorted(handle_range, std::move(block), handle_column_pos);
if (!res || !res.rows())
continue;
else
return res;
return header.cloneWithColumns(std::move(columns));
}
return {};
}
Expand Down Expand Up @@ -160,11 +147,8 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
writeDeleteFromDelta(1);
break;
case DT_INS:
if (handle_range.check(delta_value_space->getHandle(tuple_id)))
{
writeInsertFromDelta(output_columns, tuple_id);
--output_write_limit;
}
writeInsertFromDelta(output_columns, tuple_id);
--output_write_limit;
break;
default:
throw Exception("Entry type " + DTTypeString(entry_it.getType()) + " is not supported, is end: "
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ DiskValueSpacePtr DiskValueSpace::doFlushCache(const OpContext & context, WriteB

ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const
{
return std::make_shared<ChunkBlockInputStream>(chunks, 0, HandleRange::newAll(), read_columns, page_reader, RSOperatorPtr());
return std::make_shared<ChunkBlockInputStream>(chunks, read_columns, page_reader, RSOperatorPtr());
}

size_t DiskValueSpace::num_rows() const
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/DeltaMerge/HandleFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ inline Block filterUnsorted(const HandleRange & handle_range, Block && block, si
}
} // namespace HandleFilter

template <bool is_block_sorted>
class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
{
public:
DMHandleFilterBlockInputStream(const BlockInputStreamPtr & input,
HandleRange handle_range_,
size_t handle_col_pos_,
bool is_block_sorted_)
: handle_range(handle_range_), handle_col_pos(handle_col_pos_), is_block_sorted(is_block_sorted_)
size_t handle_col_pos_)
: handle_range(handle_range_), handle_col_pos(handle_col_pos_)
{
children.push_back(input);
}
Expand Down Expand Up @@ -120,7 +120,6 @@ class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
private:
HandleRange handle_range;
size_t handle_col_pos;
bool is_block_sorted;
};

} // namespace DM
Expand Down
Loading

0 comments on commit afddd22

Please sign in to comment.