Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-541] Read ranges indicated by mvcc_query_info #267

Merged
merged 2 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TiKVRange.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/RegionRangeKeys.h>

#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
Expand Down Expand Up @@ -793,6 +794,14 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
const auto & epoch = region.region_epoch();
info.version = epoch.version();
info.conf_version = epoch.conf_ver();
if (const auto & managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage))
{
// Extract the handle range according to current table
TiKVKey start_key = RecordKVFormat::encodeAsTiKVKey(region.start_key());
TiKVKey end_key = RecordKVFormat::encodeAsTiKVKey(region.end_key());
RegionRangeKeys region_range(std::move(start_key), std::move(end_key));
info.range_in_table = region_range.getHandleRangeByTable(managed_storage->getTableInfo().id);
}
query_info.mvcc_query_info->regions_query_info.push_back(info);
}

Expand Down
147 changes: 18 additions & 129 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,163 +10,52 @@ namespace DB
{
namespace DM
{
/// Read `chunks` as blocks.
/// We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly.
///
/// Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}.
///
/// For example:
/// size_t skip_rows = 0;
/// while(stream.hasNext())
/// {
/// if(stream.shouldSkipNext())
/// {
/// skip_rows += stream.nextRows();
/// stream.skipNext();
/// continue;
/// }
/// auto block = stream.read();
/// ...
/// }
class ChunkBlockInputStream final : public IBlockInputStream
{
public:
ChunkBlockInputStream(const Chunks & chunks_,
size_t handle_col_pos_,
const HandleRange & handle_range_,
const ColumnDefines & read_columns_,
const PageReader & page_reader_,
const RSOperatorPtr & filter_)
: chunks(chunks_),
handle_col_pos(handle_col_pos_),
handle_range(handle_range_),
read_columns(read_columns_),
page_reader(page_reader_),
filter(filter_)
const RSOperatorPtr & filter)
: chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
{
}

String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

Block read() override
{
if (!hasNext())
return {};
Block tmp;
if (!cur_chunk_data)
// It means user ignore the skipNext() result and insist to read data.
tmp = readCurChunkData();
else
tmp.swap(cur_chunk_data);

++cur_chunk_index;
cur_chunk_skip = false;

return tmp;
}

bool hasNext()
{
if (cur_chunk_index >= chunks.size())
return false;
// Filter out those rows not fit for handle_range.
for (; cur_chunk_index < chunks.size(); ++cur_chunk_index)
{
auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast();
if (handle_range.intersect(first, last))
break;
}

if (cur_chunk_index >= chunks.size())
return false;

if (!cur_chunk_data)
if (filter)
{
if (filter)
for (size_t i = 0; i < chunks.size(); ++i)
{
auto & chunk = chunks[cur_chunk_index];
auto & chunk = chunks[i];
RSCheckParam param;
for (auto & [col_id, meta] : chunk.getMetas())
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));

cur_chunk_skip = filter->roughCheck(param) == None;
}
if (!cur_chunk_skip)
{
cur_chunk_data = readCurChunkData();
skip_chunks[i] = filter->roughCheck(param) == None;
}
}

return true;
}

size_t nextRows()
{
auto & chunk = chunks[cur_chunk_index];
if (isCurChunkCompleted(chunk))
return chunk.getRows();

// Otherwise, some rows of current chunk are filtered out by handle_range.

if (cur_chunk_data)
{
return cur_chunk_data.rows();
}
else
{
// Current chunk is ignored by `filter`,
// but we still need to get the row count which their handles are included by handle_range.
auto block = readChunk(chunk, {read_columns[handle_col_pos]}, page_reader);
auto offset_limit
= HandleFilter::getPosRangeOfSorted(handle_range, block.getByPosition(handle_col_pos).column, 0, block.rows());
return offset_limit.second;
}
}

bool shouldSkipNext() { return cur_chunk_skip; }
String getName() const override { return "Chunk"; }
Block getHeader() const override { return toEmptyBlock(read_columns); }

void skipNext()
Block read() override
{
++cur_chunk_index;

cur_chunk_data = {};
cur_chunk_skip = false;
if (!hasNext())
return {};
return readChunk(chunks[cur_chunk_index++], read_columns, page_reader);
}

private:
inline bool isCurChunkCompleted(const Chunk & chunk)
{
auto [first, last] = chunk.getHandleFirstLast();
return handle_range.include(first, last);
}
bool hasNext() { return cur_chunk_index < chunks.size(); }
size_t nextRows() { return chunks[cur_chunk_index].getRows(); }

inline Block readCurChunkData()
{
auto & chunk = chunks[cur_chunk_index];
if (isCurChunkCompleted(chunk))
{
return readChunk(chunk, read_columns, page_reader);
}
else
{
auto block = readChunk(chunk, read_columns, page_reader);
return HandleFilter::filterSorted(handle_range, std::move(block), handle_col_pos);
}
}
bool shouldSkipNext() { return skip_chunks[cur_chunk_index]; }
void skipNext() { ++cur_chunk_index; }

private:
Chunks chunks;
size_t handle_col_pos;
HandleRange handle_range;
Chunks chunks;
std::vector<UInt8> skip_chunks;

ColumnDefines read_columns;
PageReader page_reader;
RSOperatorPtr filter;

size_t cur_chunk_index = 0;
bool cur_chunk_skip = false;
Block cur_chunk_data;
};

using ChunkBlockInputStreamPtr = std::shared_ptr<ChunkBlockInputStream>;
Expand Down
26 changes: 5 additions & 21 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
using DeltaValueSpacePtr = std::shared_ptr<DeltaValueSpace>;
using SharedLock = std::shared_lock<std::shared_mutex>;

size_t handle_column_pos;
HandleRange handle_range;

ChunkBlockInputStreamPtr stable_input_stream;
ChunkBlockInputStream * stable_input_stream_raw_ptr;

Expand Down Expand Up @@ -59,16 +56,12 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
bool delta_done = false;

public:
DeltaMergeBlockInputStream(size_t handle_column_pos_,
const HandleRange & handle_range_,
const ChunkBlockInputStreamPtr & stable_input_stream_,
DeltaMergeBlockInputStream(const ChunkBlockInputStreamPtr & stable_input_stream_,
const DeltaValueSpacePtr & delta_value_space_,
IndexIterator index_begin,
IndexIterator index_end,
size_t max_block_size_)
: handle_column_pos(handle_column_pos_),
handle_range(handle_range_),
stable_input_stream(stable_input_stream_),
: stable_input_stream(stable_input_stream_),
stable_input_stream_raw_ptr(stable_input_stream.get()),
delta_value_space(delta_value_space_),
entry_it(index_begin),
Expand Down Expand Up @@ -125,13 +118,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
if (limit == max_block_size)
continue;

Block block = header.cloneWithColumns(std::move(columns));

Block res = HandleFilter::filterSorted(handle_range, std::move(block), handle_column_pos);
if (!res || !res.rows())
continue;
else
return res;
return header.cloneWithColumns(std::move(columns));
}
return {};
}
Expand Down Expand Up @@ -160,11 +147,8 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
writeDeleteFromDelta(1);
break;
case DT_INS:
if (handle_range.check(delta_value_space->getHandle(tuple_id)))
{
writeInsertFromDelta(output_columns, tuple_id);
--output_write_limit;
}
writeInsertFromDelta(output_columns, tuple_id);
--output_write_limit;
break;
default:
throw Exception("Entry type " + DTTypeString(entry_it.getType()) + " is not supported, is end: "
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ DiskValueSpacePtr DiskValueSpace::doFlushCache(const OpContext & context, WriteB

ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const
{
return std::make_shared<ChunkBlockInputStream>(chunks, 0, HandleRange::newAll(), read_columns, page_reader, RSOperatorPtr());
return std::make_shared<ChunkBlockInputStream>(chunks, read_columns, page_reader, RSOperatorPtr());
}

size_t DiskValueSpace::num_rows() const
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/DeltaMerge/HandleFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ inline Block filterUnsorted(const HandleRange & handle_range, Block && block, si
}
} // namespace HandleFilter

template <bool is_block_sorted>
class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
{
public:
DMHandleFilterBlockInputStream(const BlockInputStreamPtr & input,
HandleRange handle_range_,
size_t handle_col_pos_,
bool is_block_sorted_)
: handle_range(handle_range_), handle_col_pos(handle_col_pos_), is_block_sorted(is_block_sorted_)
size_t handle_col_pos_)
: handle_range(handle_range_), handle_col_pos(handle_col_pos_)
{
children.push_back(input);
}
Expand Down Expand Up @@ -120,7 +120,6 @@ class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
private:
HandleRange handle_range;
size_t handle_col_pos;
bool is_block_sorted;
};

} // namespace DM
Expand Down
Loading