Skip to content

Commit

Permalink
Storage: Refactor DMFileReader
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Mar 20, 2024
1 parent 63e83b3 commit 1d1e04f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 158 deletions.
219 changes: 66 additions & 153 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Storages/S3/S3RandomAccessFile.h>
#include <fmt/format.h>


namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -78,7 +79,7 @@ DMFileReader::DMFileReader(
, enable_column_cache(enable_column_cache_ && column_cache_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
, skip_packs_by_column(read_columns.size(), 0)
, must_skip_next_read(read_columns.size(), false)
, mark_cache(mark_cache_)
, column_cache(column_cache_)
, scan_context(scan_context_)
Expand Down Expand Up @@ -111,10 +112,6 @@ DMFileReader::DMFileReader(
if (max_sharing_column_count > 0)
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, max_sharing_column_count, log);
for (const auto & cd : read_columns)
{
last_read_from_cache[cd.id] = false;
}
}
}

Expand All @@ -141,43 +138,14 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows)

size_t DMFileReader::skipNextBlock()
{
// Go to next available pack.
size_t skip;
if (!getSkippedRows(skip))
return 0;

// Find the next contiguous packs will be read in next read,
// let next_pack_id point to the next pack of the contiguous packs.
// For example, if we have 10 packs, use_packs is [0, 1, 1, 0, 1, 1, 0, 0, 1, 1],
// and now next_pack_id is 1, then we will skip 2 packs(index 1 and 2), and next_pack_id will be 3.
const size_t read_pack_limit = read_one_pack_every_time ? 1 : 0;
const std::vector<RSResult> & handle_res = pack_filter.getHandleRes();
const RSResult expected_handle_res = handle_res[next_pack_id];
auto & use_packs = pack_filter.getUsePacks();
size_t start_pack_id = next_pack_id;
const auto & pack_stats = dmfile->getPackStats();
size_t read_rows = 0;
for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read;
++next_pack_id)
{
if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit)
break;
if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res)
break;

read_rows += pack_stats[next_pack_id].rows;
}

// Only move forward the next_pack_id and next_row_offset, but don't read any data.
auto [read_rows, cr_on_normal_mode, cr_on_handle_on_fast_mode, cr_on_del_on_fast_mode] = getReadRows();
(void)cr_on_normal_mode, (void)cr_on_handle_on_fast_mode, (void)cr_on_del_on_fast_mode;
addSkippedRows(read_rows);
next_row_offset += read_rows;

// When we read dmfile, if the previous pack is not read,
// then we should seek to the right offset of dmfile.
// So if skip some packs successfully,
// then we set the last pack to false to indicate that we should seek before read.
if (likely(read_rows > 0))
use_packs[next_pack_id - 1] = false;

std::fill(must_skip_next_read.begin(), must_skip_next_read.end(), true);
scan_context->late_materialization_skip_rows += read_rows;
return read_rows;
}
Expand Down Expand Up @@ -294,119 +262,90 @@ bool DMFileReader::isCacheableColumn(const ColumnDefine & cd)
return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == VERSION_COLUMN_ID;
}

Block DMFileReader::read()
// <read_rows, do_clean_read_on_normal_mode, do_clean_read_on_handle_on_fast_mode, do_clean_read_on_del_on_fast_mode>
std::tuple<size_t, bool, bool, bool> DMFileReader::getReadRows()
{
Stopwatch watch;
SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed(););

// Go to next available pack.
size_t skip_rows;

getSkippedRows(skip_rows);
const auto & use_packs = pack_filter.getUsePacksConst();

const auto & use_packs = pack_filter.getUsePacksConst();
if (next_pack_id >= use_packs.size())
return {};
return {0, false, false, false};

// Find max continuing rows we can read.
size_t start_pack_id = next_pack_id;
size_t start_row_offset = next_row_offset;
// When read_one_pack_every_time is true, we can just read one pack every time.
// 0 means no limit
size_t read_pack_limit = read_one_pack_every_time ? 1 : 0;
// std::numeric_limits<UInt64>::max() means no limit
size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits<UInt64>::max();

const auto & pack_stats = dmfile->getPackStats();

const auto & pack_properties = dmfile->getPackProperties();

size_t read_rows = 0;
size_t not_clean_rows = 0;
size_t deleted_rows = 0;
// Clean read optimize
bool do_clean_read_on_normal_mode = enable_handle_clean_read && !is_fast_scan;
bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan;

const std::vector<RSResult> & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter
const auto & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter
RSResult expected_handle_res = handle_res[next_pack_id];
do_clean_read_on_normal_mode = do_clean_read_on_normal_mode && (expected_handle_res == All);
do_clean_read_on_handle_on_fast_mode = do_clean_read_on_handle_on_fast_mode && (expected_handle_res == All);
for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read;
++next_pack_id)
{
if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit)
if (next_pack_id - start_pack_id >= read_pack_limit)
break;
if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res)
// If try to do clean read on handle column, then handle_res of all read packs should be All.
if ((do_clean_read_on_normal_mode || do_clean_read_on_handle_on_fast_mode) && handle_res[next_pack_id] != All)
break;

read_rows += pack_stats[next_pack_id].rows;
not_clean_rows += pack_stats[next_pack_id].not_clean;
do_clean_read_on_normal_mode = do_clean_read_on_normal_mode && (pack_stats[next_pack_id].not_clean == 0)
&& (pack_filter.getMaxVersion(next_pack_id) <= max_read_version);
// Because deleted_rows is a new field in pack_properties, we need to check whehter this pack has this field.
// If this pack doesn't have this field, then we can't know whether this pack contains deleted rows.
// Thus we just deleted_rows += 1, to make sure we will not do the optimization with del column(just to make deleted_rows != 0).
if (static_cast<size_t>(pack_properties.property_size()) > next_pack_id
&& pack_properties.property(next_pack_id).has_deleted_rows())
{
deleted_rows += pack_properties.property(next_pack_id).deleted_rows();
}
else
{
deleted_rows += 1;
}
// So only when this pack has this field and deleted_rows is 0, we can do clean read on del column.
do_clean_read_on_del_on_fast_mode = do_clean_read_on_del_on_fast_mode
&& (static_cast<size_t>(pack_properties.property_size()) > next_pack_id)
&& (pack_properties.property(next_pack_id).has_deleted_rows())
&& (pack_properties.property(next_pack_id).deleted_rows() == 0);
}
next_row_offset += read_rows;
return {
read_rows,
do_clean_read_on_normal_mode,
do_clean_read_on_handle_on_fast_mode,
do_clean_read_on_del_on_fast_mode,
};
}

Block DMFileReader::read()
{
Stopwatch watch;
SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed(););

size_t start_pack_id = next_pack_id;
auto [read_rows, cr_on_normal_mode, cr_on_handle_on_fast_mode, cr_on_del_on_fast_mode] = getReadRows();

if (read_rows == 0)
return {};

Block res;
res.setStartOffset(start_row_offset);
res.setStartOffset(next_row_offset);
next_row_offset += read_rows;

size_t read_packs = next_pack_id - start_pack_id;

addScannedRows(read_rows);

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode
= enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_scan);

bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan && deleted_rows == 0;

if (do_clean_read_on_normal_mode)
{
UInt64 max_version = 0;
for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id)
max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version);
do_clean_read_on_normal_mode = max_version <= max_read_version;
}

const auto & pack_stats = dmfile->getPackStats();
for (size_t i = 0; i < read_columns.size(); ++i)
{
try
{
// For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK.
const auto & cd = read_columns[i];
if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle_on_fast_mode)
{
// Return the first row's handle
ColumnPtr column;
if (is_common_handle)
{
StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size));
}
else
{
Handle min_handle = pack_filter.getMinHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle));
}
res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = read_packs;
}
else if (cd.id == TAG_COLUMN_ID && do_clean_read_on_del_on_fast_mode)
{
ColumnPtr column = cd.type->createColumnConst(
read_rows,
Field(static_cast<UInt64>(pack_stats[start_pack_id].first_tag)));
res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id});

skip_packs_by_column[i] = read_packs;
}
else if (do_clean_read_on_normal_mode && isExtraColumn(cd))
if ((cr_on_normal_mode || cr_on_handle_on_fast_mode || cr_on_del_on_fast_mode) && isExtraColumn(cd))
{
ColumnPtr column;
if (cd.id == EXTRA_HANDLE_COLUMN_ID)
Expand Down Expand Up @@ -435,34 +374,28 @@ Block DMFileReader::read()
}

res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id});

skip_packs_by_column[i] = read_packs;
must_skip_next_read[i] = true;
}
else
{
const auto stream_name = DMFile::getFileNameBase(cd.id);
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
{
auto data_type = dmfile->getColumnStat(cd.id).type;
ColumnPtr column;
if (enable_column_cache && isCacheableColumn(cd))
{
auto read_strategy = column_cache->getReadStrategy(start_pack_id, read_packs, cd.id);

auto data_type = dmfile->getColumnStat(cd.id).type;
auto column = data_type->createColumn();
column->reserve(read_rows);
for (auto & [range, strategy] : read_strategy)
{
if (strategy == ColumnCache::Strategy::Memory)
{
for (size_t cursor = range.first; cursor < range.second; cursor++)
{
auto cache_element = column_cache->getColumn(cursor, cd.id);
column->insertRangeFrom(
*(cache_element.first),
cache_element.second.first,
cache_element.second.second);
column = cache_element.first;
}
skip_packs_by_column[i] += (range.second - range.first);
must_skip_next_read[i] = true;
}
else if (strategy == ColumnCache::Strategy::Disk)
{
Expand All @@ -471,53 +404,34 @@ Block DMFileReader::read()
{
rows_count += pack_stats[cursor].rows;
}
ColumnPtr col;
readColumn(
cd,
col,
range.first,
range.second - range.first,
rows_count,
skip_packs_by_column[i]);
column->insertRangeFrom(*col, 0, col->size());
skip_packs_by_column[i] = 0;
readColumn(cd, column, range.first, range.second - range.first, rows_count, i);
}
else
{
throw Exception("Unknown strategy", ErrorCodes::LOGICAL_ERROR);
}
}
ColumnPtr result_column = std::move(column);
size_t rows_offset = 0;
for (size_t cursor = start_pack_id; cursor < start_pack_id + read_packs; cursor++)
{
column_cache
->tryPutColumn(cursor, cd.id, result_column, rows_offset, pack_stats[cursor].rows);
column_cache->tryPutColumn(cursor, cd.id, column, rows_offset, pack_stats[cursor].rows);
rows_offset += pack_stats[cursor].rows;
}
// Cast column's data from DataType in disk to what we need now
auto converted_column
= convertColumnByColumnDefineIfNeed(data_type, std::move(result_column), cd);
res.insert(ColumnWithTypeAndName{converted_column, cd.type, cd.name, cd.id});
}
else
{
auto data_type = dmfile->getColumnStat(cd.id).type;
ColumnPtr column;
readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i]);
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd);

res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
readColumn(cd, column, start_pack_id, read_packs, read_rows, i);
}
// Cast column's data from DataType in disk to what we need now
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd);
res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id});
}
else
{
// New column after ddl is not exist in this DMFile, fill with default value
ColumnPtr column = createColumnWithDefaultValue(cd, read_rows);

res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
must_skip_next_read[i] = false;
}
}
}
Expand All @@ -535,7 +449,6 @@ void DMFileReader::readFromDisk(
MutableColumnPtr & column,
size_t start_pack_id,
size_t read_rows,
size_t skip_packs,
bool force_seek)
{
const auto stream_name = DMFile::getFileNameBase(column_define.id);
Expand All @@ -549,7 +462,7 @@ void DMFileReader::readFromDisk(

#endif
auto & top_stream = iter->second;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
bool should_seek = force_seek || shouldSeek(start_pack_id);
auto data_type = dmfile->getColumnStat(column_define.id).type;
data_type->deserializeBinaryBulkWithMultipleStreams( //
*column,
Expand Down Expand Up @@ -578,7 +491,7 @@ void DMFileReader::readColumn(
size_t start_pack_id,
size_t pack_count,
size_t read_rows,
size_t skip_packs)
size_t column_index)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column))
Expand All @@ -591,13 +504,13 @@ void DMFileReader::readColumn(
= has_concurrent_reader ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
auto data_type = dmfile->getColumnStat(column_define.id).type;
auto col = data_type->createColumn();
readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]);
readFromDisk(column_define, col, start_pack_id, read_rows, must_skip_next_read[column_index]);
column = std::move(col);
last_read_from_cache[column_define.id] = false;
must_skip_next_read[column_index] = false;
}
else
{
last_read_from_cache[column_define.id] = true;
must_skip_next_read[column_index] = true;
}

if (has_concurrent_reader && col_data_cache != nullptr)
Expand Down
Loading

0 comments on commit 1d1e04f

Please sign in to comment.