Skip to content

Commit

Permalink
stable only
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Dec 21, 2022
1 parent 8e60340 commit 371a53f
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 45 deletions.
31 changes: 28 additions & 3 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,31 @@

namespace DB::DM
{
BitmapFilter::BitmapFilter(UInt32 size_, const SegmentSnapshotPtr & snapshot_)
: filter(size_, false)
BitmapFilter::BitmapFilter(UInt32 size_, const SegmentSnapshotPtr & snapshot_, bool default_value)
: filter(size_, default_value)
, snap(snapshot_)
, all_match(false)
, all_match(default_value)
{}

void BitmapFilter::set(BlockInputStreamPtr & stream)
{
stream->readPrefix();
for (;;)
{
FilterPtr f = nullptr;
auto blk = stream->read(f, /*res_filter*/true);
if (likely(blk))
{
set(blk.segmentRowIdCol(), f);
}
else
{
break;
}
}
stream->readSuffix();
}

void BitmapFilter::set(const ColumnPtr & col, const FilterPtr f)
{
const auto * v = toColumnVectorDataPtr<UInt32>(col);
Expand Down Expand Up @@ -57,6 +76,12 @@ void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr f)
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ using SegmentSnapshotPtr = std::shared_ptr<SegmentSnapshot>;
class BitmapFilter
{
public:
BitmapFilter(UInt32 size_, const SegmentSnapshotPtr & snapshot_);
BitmapFilter(UInt32 size_, const SegmentSnapshotPtr & snapshot_, bool default_value);

void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr f);
void set(const UInt32 * data, UInt32 size, const FilterPtr f);

void set(UInt32 start, UInt32 limit);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

Expand Down
188 changes: 161 additions & 27 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr
streams.push_back(stream);
}

BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream>(streams);
BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream<>>(streams);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, read_ranges, 0);
stable_stream->readPrefix();
while (true)
Expand Down Expand Up @@ -606,21 +606,6 @@ bool Segment::useCleanRead(const SegmentSnapshotPtr & segment_snap,
&& !hasColumn(columns_to_read, TAG_COLUMN_ID);
}

bool Segment::useBitmapFilter(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const ColumnDefines & columns_to_read)
{
if (!dm_context.db_context.getSettingsRef().dt_enable_bitmap_filter)
{
return false;
}
if (dm_context.read_delta_only || dm_context.read_stable_only)
{
return false;
}
return !useCleanRead(segment_snap, columns_to_read);
}

BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
Expand Down Expand Up @@ -2276,6 +2261,25 @@ BitmapFilterPtr Segment::buildBitmapFilter(const DMContext & dm_context,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
RUNTIME_CHECK_MSG(!dm_context.read_delta_only, "Read delta only is unsupported");

if (dm_context.read_stable_only || (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0))
{
return buildBitmapFilterStableOnly(dm_context, segment_snap, read_ranges, filter, max_version, expected_block_size);
}
else
{
return buildBitmapFilterNormal(dm_context, segment_snap, read_ranges, filter, max_version, expected_block_size);
}
}

BitmapFilterPtr Segment::buildBitmapFilterNormal(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size)
{
Stopwatch sw_total;
static ColumnDefines columns_to_read{
Expand All @@ -2291,22 +2295,152 @@ BitmapFilterPtr Segment::buildBitmapFilter(const DMContext & dm_context,
expected_block_size,
/*need_row_id*/ true);
auto total_rows = segment_snap->delta->getRows() + segment_snap->stable->getDMFilesRows();
auto bitmap_filter = std::make_shared<BitmapFilter>(total_rows, segment_snap);
for (;;)
{
FilterPtr f = nullptr;
auto blk = stream->read(f, true);
if (likely(blk))
auto bitmap_filter = std::make_shared<BitmapFilter>(total_rows, segment_snap, /*default_value*/false);
bitmap_filter->set(stream);
bitmap_filter->runOptimize();
LOG_DEBUG(log, "buildBitmapFilter total_rows={} cost={}ms", total_rows, sw_total.elapsedMilliseconds());
return bitmap_filter;
}

struct PackInfo
{
UInt64 pack_id;
UInt64 offset;
UInt64 rows;
};
std::pair<std::vector<PackInfo>, IdSetPtr> parseDMFilePackInfo(const DMFilePtr & dmfile,
const DMContext & dm_context,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 read_version)
{
auto mark_cache = dm_context.db_context.getMarkCache();
auto index_cache = dm_context.db_context.getMinMaxIndexCache();
auto file_provider = dm_context.db_context.getFileProvider();
auto read_limiter = dm_context.db_context.getReadLimiter();
DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
read_ranges,
filter,
/*read_pack*/ {},
file_provider,
read_limiter,
dm_context.scan_context,
dm_context.tracing_id);
const auto & use_packs = pack_filter.getUsePacks();
const auto & handle_res = pack_filter.getHandleRes();
const auto & pack_stats = dmfile->getPackStats();

// Packs that all rows compliant with MVCC filter and RowKey filter requirements.
// For building bitmap filter, we don't need to read these packs,
// just set corresponding positions in the bitmap to true.
std::vector<PackInfo> all_packs;
// Packs that some rows compliant with MVCC filter and RowKey filter requirements.
// We need to read these packs and do RowKey filter and MVCC filter for them.
auto some_packs = std::make_shared<IdSet>();
UInt64 rows = 0;
for (size_t pack_id = 0; pack_id < pack_stats.size(); pack_id++)
{
const auto & pack_stat = pack_stats[pack_id];
rows += pack_stat.rows;
if (!use_packs[pack_id])
{
bitmap_filter->set(blk.segmentRowIdCol(), f);
continue;
}
else

// assert(handle_res[pack_id] == RSResult::Some || handle_res[pack_id] == RSResult::All);

if (handle_res[pack_id] == RSResult::Some)
{
break;
// We need to read this pack to do RowKey filter.
some_packs->insert(pack_id);
continue;
}

// assert(handle_res[pack_id] == RSResult::All);

if (pack_stat.not_clean > 0)
{
// We need to read this pack to do MVCC filter.
some_packs->insert(pack_id);
continue;
}

auto max_version = pack_filter.getMaxVersion(pack_id);
if (max_version > read_version)
{
// We need to read this pack to do MVCC filter.
some_packs->insert(pack_id);
continue;
}
all_packs.push_back({pack_id, rows - pack_stat.rows, pack_stat.rows});
}
bitmap_filter->runOptimize();
LOG_DEBUG(log, "buildBitmapFilter total_rows={} cost={}ms", total_rows, sw_total.elapsedMilliseconds());
return {all_packs, some_packs};
}

BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 read_version,
[[maybe_unused]]size_t expected_block_size)
{
const auto & dmfiles = segment_snap->stable->getDMFiles();
RUNTIME_CHECK(!dmfiles.empty(), dmfiles.size());
std::vector<std::vector<PackInfo>> all_packs{dmfiles.size()};
std::vector<IdSetPtr> some_packs{dmfiles.size()};
bool all_match = true;
for (size_t i = 0; i < dmfiles.size(); i++)
{
std::tie(all_packs[i], some_packs[i]) = parseDMFilePackInfo(dmfiles[i],
dm_context,
read_ranges,
filter,
read_version);
all_match = all_match && (all_packs[i].size() == dmfiles[i]->getPacks());
}

if (all_match)
{
return std::make_shared<BitmapFilter>(segment_snap->stable->getDMFilesRows(), segment_snap, /*default_value*/true);
}

auto bitmap_filter = std::make_shared<BitmapFilter>(segment_snap->stable->getDMFilesRows(), segment_snap, /*default_value*/false);
UInt32 preceded_dmfile_rows = 0;
for (size_t i = 0; i < dmfiles.size(); i++)
{
const auto & all_pack = all_packs[i];
for (const auto & pack : all_pack)
{
bitmap_filter->set(pack.offset + preceded_dmfile_rows, pack.rows);
}
preceded_dmfile_rows += dmfiles[i]->getRows();
}
static ColumnDefines columns_to_read{
getExtraHandleColumnDefine(is_common_handle),
};

BlockInputStreamPtr stream = segment_snap->stable->getInputStream(dm_context,
columns_to_read,
read_ranges,
filter,
read_version,
expected_block_size,
/*enable_handle_clean_read*/ false,
/*is_fast_scan*/ false,
/*enable_del_clean_read*/ false,
/*read_packs*/ some_packs,
/*need_row_id*/ true);
stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stream, read_ranges, 0);
stream = std::make_shared<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_MVCC>>(
stream,
columns_to_read,
read_version,
is_common_handle,
dm_context.tracing_id);
bitmap_filter->set(stream);
return bitmap_filter;
}

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,6 @@ class Segment

void setLastCheckGCSafePoint(DB::Timestamp gc_safe_point) { last_check_gc_safe_point.store(gc_safe_point, std::memory_order_relaxed); }

static bool useBitmapFilter(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const ColumnDefines & columns_to_read);

private:
ReadInfo getReadInfo(
const DMContext & dm_context,
Expand Down Expand Up @@ -607,6 +603,18 @@ class Segment
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size);
BitmapFilterPtr buildBitmapFilterNormal(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size);
BitmapFilterPtr buildBitmapFilterStableOnly(const DMContext & dm_context,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size);
BlockInputStreamPtr getBitmapFilterInputStream(BitmapFilterPtr && bitmap_filter,
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ bool SegmentReadTasksWrapper::empty() const
return ordered_tasks.empty() && unordered_tasks.empty();
}

ReadMode SegmentReadTaskPool::readModeOfSegment(SegmentReadTaskPtr & t)
ReadMode SegmentReadTaskPool::readModeOfSegment()
{
if (read_mode == ReadMode::Normal
&& Segment::useBitmapFilter(*dm_context, t->read_snapshot, columns_to_read))
if (read_mode == ReadMode::Normal &&
dm_context->db_context.getSettingsRef().dt_enable_bitmap_filter &&
!dm_context->read_delta_only)
{
return ReadMode::Bitmap;
}
Expand All @@ -167,7 +168,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t
MemoryTrackerSetter setter(true, mem_tracker.get());
BlockInputStreamPtr stream;
auto block_size = std::max(expected_block_size, static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
auto mode = readModeOfSegment(t);
auto mode = readModeOfSegment();
stream = t->segment->getInputStream(mode, *dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size);
LOG_DEBUG(log, "getInputStream succ, read_mode={}, pool_id={} segment_id={}", magic_enum::enum_name(mode), pool_id, t->segment->segmentId());
return stream;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class SegmentReadTaskPool : private boost::noncopyable
bool exceptionHappened() const;
void finishSegment(const SegmentPtr & seg);
void pushBlock(Block && block);
ReadMode readModeOfSegment(SegmentReadTaskPtr & t);
ReadMode readModeOfSegment(); // TODO: determine read mode in delta merge

const uint64_t pool_id;
const int64_t table_id;
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>

Expand Down Expand Up @@ -52,6 +53,7 @@ class EmptySkippableBlockInputStream : public SkippableBlockInputStream
ColumnDefines read_columns;
};

template <bool need_row_id = false>
class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
public:
Expand Down Expand Up @@ -112,6 +114,10 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
if constexpr (need_row_id)
{
res.setSegmentRowIdCol(createSegmentRowIdCol(res.startOffset(), res.rows()));
}
break;
}
else
Expand All @@ -126,6 +132,17 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
}

private:
ColumnPtr createSegmentRowIdCol(UInt64 start, UInt64 limit)
{
auto seg_row_id_col = ColumnUInt32::create();
ColumnUInt32::Container & res = seg_row_id_col->getData();
res.resize(limit);
for (UInt64 i = 0; i < limit; ++i)
{
res[i] = i + start;
}
return seg_row_id_col;
}
BlockInputStreams::iterator current_stream;
std::vector<size_t> rows;
size_t precede_stream_rows;
Expand Down
Loading

0 comments on commit 371a53f

Please sign in to comment.