Skip to content

Commit

Permalink
Storages: Add whether has null in the result of MinMaxIndex (#9288)
Browse files Browse the repository at this point in the history
ref #9103

1. Refine `RSResult`, make it can express whether null value is contained.
2. In `MinMaxIndex`, check if the pack has null value, and if so, add information containing the null value to the returned result.

Co-authored-by: JaySon <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 13, 2024
1 parent c6fcddf commit b4e8599
Show file tree
Hide file tree
Showing 19 changed files with 844 additions and 267 deletions.
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ static_assert(

static constexpr bool DM_RUN_CHECK = true;

struct Attr
{
String col_name;
ColId col_id;
DataTypePtr type;
};
using Attrs = std::vector<Attr>;

} // namespace DM
} // namespace DB

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false "
"because segment DTFile is shared with a neighbor segment, "
"first_pack_inc={} last_pack_inc={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}] segment={}",
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
at_least_result.first_pack_intersection,
at_least_result.last_pack_intersection,
fmt::join(prev_segment_file_ids, ","),
fmt::join(next_segment_file_ids, ","),
[&] {
Expand Down Expand Up @@ -687,8 +687,8 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
"check_result={} first_pack_inc={} last_pack_inc={} rows_at_least={} bytes_at_least={} file_rows={} "
"file_bytes={} segment={} ",
check_result,
magic_enum::enum_name(at_least_result.first_pack_intersection),
magic_enum::enum_name(at_least_result.last_pack_intersection),
at_least_result.first_pack_intersection,
at_least_result.last_pack_intersection,
at_least_result.rows,
at_least_result.bytes,
file_rows,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
const auto & pack_res = reader.pack_filter.getPackResConst();
for (size_t i = 0; i < n_packs; /*empty*/)
{
if (!isUse(pack_res[i]))
if (!pack_res[i].isUse())
{
++i;
continue;
}
size_t cur_offset_in_file = getOffsetInFile(i);
size_t end = i + 1;
// First, find the end of current available range.
while (end < n_packs && isUse(pack_res[end]))
while (end < n_packs && pack_res[end].isUse())
++end;

// Second, if the end of range is inside the block, we will need to read it too.
Expand Down
32 changes: 15 additions & 17 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void DMFilePackFilter::init(ReadTag read_tag)
pack_res.begin(),
[](RSResult a, RSResult b) { return a && b; });
}
auto [none_count, some_count, all_count] = countPackRes();
auto after_filter = some_count + all_count;
auto [none_count, some_count, all_count, all_null_count] = countPackRes();
auto after_filter = some_count + all_count + all_null_count;
ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter);
// In table scanning, DMFilePackFilter of a DMFile may be created several times:
// 1. When building MVCC bitmap (ReadTag::MVCC).
Expand All @@ -96,6 +96,7 @@ void DMFilePackFilter::init(ReadTag read_tag)
scan_context->rs_pack_filter_none += none_count;
scan_context->rs_pack_filter_some += some_count;
scan_context->rs_pack_filter_all += all_count;
scan_context->rs_pack_filter_all_null += all_null_count;
}

Float64 filter_rate = 0.0;
Expand All @@ -107,7 +108,8 @@ void DMFilePackFilter::init(ReadTag read_tag)
LOG_DEBUG(
log,
"RSFilter exclude rate: {:.2f}, after_pk: {}, after_read_packs: {}, after_filter: {}, handle_ranges: {}"
", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}, read_tag: {}",
", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}, all_null_count: {}, "
"read_tag: {}",
((after_read_packs == 0) ? std::numeric_limits<double>::quiet_NaN() : filter_rate),
after_pk,
after_read_packs,
Expand All @@ -118,37 +120,33 @@ void DMFilePackFilter::init(ReadTag read_tag)
none_count,
some_count,
all_count,
all_null_count,
magic_enum::enum_name(read_tag));
}

std::tuple<UInt64, UInt64, UInt64> DMFilePackFilter::countPackRes() const
std::tuple<UInt64, UInt64, UInt64, UInt64> DMFilePackFilter::countPackRes() const
{
UInt64 none_count = 0;
UInt64 some_count = 0;
UInt64 all_count = 0;
UInt64 all_null_count = 0;
for (auto res : pack_res)
{
switch (res)
{
case RSResult::None:
if (res == RSResult::None || res == RSResult::NoneNull)
++none_count;
break;
case RSResult::Some:
else if (res == RSResult::Some || res == RSResult::SomeNull)
++some_count;
break;
case RSResult::All:
else if (res == RSResult::All)
++all_count;
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is invalid", static_cast<Int32>(res));
}
else if (res == RSResult::AllNull)
++all_null_count;
}
return {none_count, some_count, all_count};
return {none_count, some_count, all_count, all_null_count};
}

UInt64 DMFilePackFilter::countUsePack() const
{
return std::count_if(pack_res.cbegin(), pack_res.cend(), [](RSResult res) { return isUse(res); });
return std::count_if(pack_res.cbegin(), pack_res.cend(), [](RSResult res) { return res.isUse(); });
}

void DMFilePackFilter::loadIndex(
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class DMFilePackFilter
const auto & pack_stats = dmfile->getPackStats();
for (size_t i = 0; i < pack_stats.size(); ++i)
{
if (isUse(pack_res[i]))
if (pack_res[i].isUse())
{
rows += pack_stats[i].rows;
bytes += pack_stats[i].bytes;
Expand Down Expand Up @@ -157,8 +157,8 @@ class DMFilePackFilter

void tryLoadIndex(ColId col_id);

// None, Some, All
std::tuple<UInt64, UInt64, UInt64> countPackRes() const;
// None+NoneNull, Some+SomeNull, All, AllNull
std::tuple<UInt64, UInt64, UInt64, UInt64> countPackRes() const;

private:
DMFilePtr dmfile;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows)
skip_rows = 0;
const auto & pack_res = pack_filter.getPackResConst();
const auto & pack_stats = dmfile->getPackStats();
for (; next_pack_id < pack_res.size() && !isUse(pack_res[next_pack_id]); ++next_pack_id)
for (; next_pack_id < pack_res.size() && !pack_res[next_pack_id].isUse(); ++next_pack_id)
{
skip_rows += pack_stats[next_pack_id].rows;
addSkippedRows(pack_stats[next_pack_id].rows);
Expand Down Expand Up @@ -151,7 +151,7 @@ size_t DMFileReader::getReadRows()
const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits<size_t>::max();
const auto & pack_stats = dmfile->getPackStats();
size_t read_rows = 0;
for (; next_pack_id < pack_res.size() && isUse(pack_res[next_pack_id]) && read_rows < rows_threshold_per_read;
for (; next_pack_id < pack_res.size() && pack_res[next_pack_id].isUse() && read_rows < rows_threshold_per_read;
++next_pack_id)
{
if (next_pack_id - start_pack_id >= read_pack_limit)
Expand Down Expand Up @@ -225,7 +225,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
// The algorithm runs as follows:
// When i = next_pack_id + 2, call read() to read {next_pack_id, next_pack_id + 1}th packs
// When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs
if (isUse(pack_res[pack_id]) && (pack_id + 1 == pack_res.size() || !isUse(pack_res[pack_id + 1])))
if (pack_res[pack_id].isUse() && (pack_id + 1 == pack_res.size() || !pack_res[pack_id + 1].isUse()))
{
Block block = read();
size_t rows = block.rows();
Expand Down Expand Up @@ -256,7 +256,7 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
}
offset += rows;
}
else if (!isUse(pack_res[pack_id]))
else if (!pack_res[pack_id].isUse())
{
offset += pack_stats[pack_id].rows;
}
Expand Down Expand Up @@ -327,14 +327,14 @@ Block DMFileReader::read()
for (size_t i = start_pack_id; i < next_pack_id; ++i)
{
// If all handle in a pack are in the given range, and del column do clean read, we do not need to read handle column.
if (handle_res[i] == All
if (handle_res[i] == RSResult::All
&& std::find(del_column_clean_read_packs.cbegin(), del_column_clean_read_packs.cend(), i)
!= del_column_clean_read_packs.cend())
{
handle_column_clean_read_packs.push_back(i);
}
// If all handle in a pack are in the given range, but disable del clean read, we do not need to read handle column.
else if (!enable_del_clean_read && handle_res[i] == All)
else if (!enable_del_clean_read && handle_res[i] == RSResult::All)
{
handle_column_clean_read_packs.push_back(i);
}
Expand All @@ -350,7 +350,7 @@ Block DMFileReader::read()
{
// If all handle in a pack are in the given range, no not_clean rows, and max version <= max_read_version,
// we do not need to read handle column.
if (handle_res[i] == All && pack_stats[i].not_clean == 0
if (handle_res[i] == RSResult::All && pack_stats[i].not_clean == 0
&& pack_filter.getMaxVersion(i) <= max_read_version)
{
handle_column_clean_read_packs.push_back(i);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Filter/Like.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class Like : public ColCmpVal

RSResults roughCheck(size_t /*start_pack*/, size_t pack_count, const RSCheckParam & /*param*/) override
{
return RSResults(pack_count, Some);
return RSResults(pack_count, RSResult::Some);
}
};

} // namespace DB::DM
} // namespace DB::DM
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/Unsupported.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Unsupported : public RSOperator

RSResults roughCheck(size_t /*start_pack*/, size_t pack_count, const RSCheckParam & /*param*/) override
{
return RSResults(pack_count, Some);
return RSResults(pack_count, RSResult::Some);
}
};

Expand Down
40 changes: 28 additions & 12 deletions dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ inline std::pair<size_t, size_t> minmax(
// If the minimum value is null, this minmax index is generated before v6.4.0.
// For compatibility, the filter result of the corresponding pack should be Some,
// and the upper layer will read the pack data to perform the filter calculation.
//
// TODO: avoid hitting this compatibility check when all the fields of a pack are null or deleted.
ALWAYS_INLINE bool minIsNull(const DB::ColumnUInt8 & null_map, size_t i)
{
return null_map.getElement(i * 2);
Expand Down Expand Up @@ -234,15 +236,16 @@ RSResults MinMaxIndex::checkNullableInImpl(
const std::vector<Field> & values,
const DataTypePtr & type)
{
RSResults results(pack_count, RSResult::Some);
RSResults results(pack_count, RSResult::SomeNull);
const auto & minmaxes_data = toColumnVectorData<T>(column_nullable.getNestedColumnPtr());
for (size_t i = start_pack; i < start_pack + pack_count; ++i)
{
if (details::minIsNull(null_map, i))
continue;
auto min = minmaxes_data[i * 2];
auto max = minmaxes_data[i * 2 + 1];
results[i - start_pack] = RoughCheck::CheckIn::check<T>(values, type, min, max);
auto value_result = RoughCheck::CheckIn::check<T>(values, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand All @@ -256,7 +259,7 @@ RSResults MinMaxIndex::checkNullableIn(
const auto & column_nullable = static_cast<const ColumnNullable &>(*minmaxes);
const auto & null_map = column_nullable.getNullMapColumn();

RSResults results(pack_count, RSResult::Some);
RSResults results(pack_count, RSResult::SomeNull);
const auto * raw_type = type.get();

#define DISPATCH(TYPE) \
Expand Down Expand Up @@ -292,7 +295,8 @@ RSResults MinMaxIndex::checkNullableIn(
pos = i * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
results[i - start_pack] = RoughCheck::CheckIn::check<String>(values, type, min, max);
auto value_result = RoughCheck::CheckIn::check<String>(values, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand Down Expand Up @@ -335,7 +339,8 @@ RSResults MinMaxIndex::checkInImpl(
continue;
auto min = minmaxes_data[i * 2];
auto max = minmaxes_data[i * 2 + 1];
results[i - start_pack] = RoughCheck::CheckIn::check<T>(values, type, min, max);
auto value_result = RoughCheck::CheckIn::check<T>(values, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand Down Expand Up @@ -383,7 +388,8 @@ RSResults MinMaxIndex::checkIn(
pos = i * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(reinterpret_cast<const char *>(&chars[prev_offset]), offsets[pos] - prev_offset - 1);
results[i - start_pack] = RoughCheck::CheckIn::check<String>(values, type, min, max);
auto value_result = RoughCheck::CheckIn::check<String>(values, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand All @@ -406,7 +412,8 @@ RSResults MinMaxIndex::checkCmpImpl(size_t start_pack, size_t pack_count, const
continue;
auto min = minmaxes_data[i * 2];
auto max = minmaxes_data[i * 2 + 1];
results[i - start_pack] = Op::template check<T>(value, type, min, max);
auto value_result = Op::template check<T>(value, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand Down Expand Up @@ -452,7 +459,8 @@ RSResults MinMaxIndex::checkCmp(size_t start_pack, size_t pack_count, const Fiel
pos = i * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(reinterpret_cast<const char *>(&chars[prev_offset]), offsets[pos] - prev_offset - 1);
results[i - start_pack] = Op::template check<String>(value, type, min, max);
auto value_result = Op::template check<String>(value, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand Down Expand Up @@ -489,15 +497,16 @@ RSResults MinMaxIndex::checkNullableCmpImpl(
const Field & value,
const DataTypePtr & type)
{
RSResults results(pack_count, RSResult::Some);
RSResults results(pack_count, RSResult::SomeNull);
const auto & minmaxes_data = toColumnVectorData<T>(column_nullable.getNestedColumnPtr());
for (size_t i = start_pack; i < start_pack + pack_count; ++i)
{
if (details::minIsNull(null_map, i))
continue;
auto min = minmaxes_data[i * 2];
auto max = minmaxes_data[i * 2 + 1];
results[i - start_pack] = Op::template check<T>(value, type, min, max);
auto value_result = Op::template check<T>(value, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand All @@ -512,7 +521,7 @@ RSResults MinMaxIndex::checkNullableCmp(
const auto & column_nullable = static_cast<const ColumnNullable &>(*minmaxes);
const auto & null_map = column_nullable.getNullMapColumn();

RSResults results(pack_count, RSResult::Some);
RSResults results(pack_count, RSResult::SomeNull);
const auto * raw_type = type.get();

#define DISPATCH(TYPE) \
Expand Down Expand Up @@ -548,7 +557,8 @@ RSResults MinMaxIndex::checkNullableCmp(
pos = i * 2 + 1;
prev_offset = offsets[pos - 1];
auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1);
results[i - start_pack] = Op::template check<String>(value, type, min, max);
auto value_result = Op::template check<String>(value, type, min, max);
results[i - start_pack] = addNullIfHasNull(value_result, i);
}
return results;
}
Expand Down Expand Up @@ -591,4 +601,10 @@ RSResults MinMaxIndex::checkIsNull(size_t start_pack, size_t pack_count)
return results;
}

RSResult MinMaxIndex::addNullIfHasNull(RSResult value_result, size_t i) const
{
if (has_null_marks[i])
value_result.setHasNull();
return value_result;
}
} // namespace DB::DM
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class MinMaxIndex
const std::vector<Field> & values,
const DataTypePtr & type);

RSResult addNullIfHasNull(RSResult value_result, size_t i) const;

PaddedPODArray<UInt8> has_null_marks;
PaddedPODArray<UInt8> has_value_marks;
MutableColumnPtr minmaxes;
Expand Down
Loading

0 comments on commit b4e8599

Please sign in to comment.