diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 54c3c896324..20898231ea2 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -541,8 +541,12 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, { (void)handle; if (read_segments.empty() || read_segments.count(segment->segmentId())) - tasks.push( - std::make_shared(segment, segment->createSnapshot(*dm_context), HandleRanges{segment->getRange()})); + { + auto segment_snap = segment->createSnapshot(*dm_context); + if (unlikely(!segment_snap)) + throw Exception("Failed to get segment snap", ErrorCodes::LOGICAL_ERROR); + tasks.push(std::make_shared(segment, segment_snap, HandleRanges{segment->getRange()})); + } } } @@ -607,8 +611,11 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, { if (tasks.empty() || tasks.back()->segment != seg_it->second) { - auto segment = seg_it->second; - tasks.push(std::make_shared(segment, segment->createSnapshot(*dm_context))); + auto segment = seg_it->second; + auto segment_snap = segment->createSnapshot(*dm_context); + if (unlikely(!segment_snap)) + throw Exception("Failed to get segment snap", ErrorCodes::LOGICAL_ERROR); + tasks.push(std::make_shared(segment, segment_snap)); } tasks.back()->addRange(req_range); @@ -1002,7 +1009,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP { LOG_DEBUG(log, "Split segment " << segment->info()); - SegmentSnapshot segment_snap; + SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); @@ -1079,8 +1086,8 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le { LOG_DEBUG(log, "Merge Segment [" << left->info() << "] and [" << right->info() << "]"); - SegmentSnapshot left_snap; - SegmentSnapshot right_snap; + SegmentSnapshotPtr left_snap; + SegmentSnapshotPtr right_snap; { std::shared_lock lock(read_write_mutex); @@ -1161,7 +1168,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm { LOG_DEBUG(log, (is_foreground ? "Foreground" : "Background") << " merge delta, segment [" << segment->segmentId() << "]"); - SegmentSnapshot segment_snap; + SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); diff --git a/dbms/src/Storages/DeltaMerge/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/DeltaValueSpace.h index 5cd1dc4afb4..2752ecdd669 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DeltaValueSpace.h @@ -5,6 +5,7 @@ #include #include #include +#include #include namespace DB @@ -17,8 +18,6 @@ class DeltaValueSpace; using DeltaValueSpacePtr = std::shared_ptr; struct WriteBatches; class StoragePool; -struct StorageSnapshot; -using StorageSnapshotPtr = std::shared_ptr; struct DMContext; struct BlockOrDelete diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index daccaa6cf88..3521a3ffdae 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -253,18 +253,21 @@ bool Segment::write(DMContext & dm_context, const HandleRange & delete_range) return delta->appendDeleteRange(dm_context, delete_range); } -SegmentSnapshot Segment::createSnapshot(const DMContext & dm_context, bool is_update) const +SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool is_update) const { - return {delta->createSnapshot(dm_context, is_update), stable}; + auto delta_snap = delta->createSnapshot(dm_context, is_update); + if (!delta_snap) + return {}; + return std::make_shared(delta_snap, stable); } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - SegmentSnapshot & segment_snap, - const HandleRanges & read_ranges, - const RSOperatorPtr & filter, - UInt64 max_version, - size_t expected_block_size) +BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const HandleRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) { LOG_TRACE(log, "Segment [" << segment_id << "] create InputStream"); @@ -278,15 +281,15 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, } else if (dm_context.read_stable_only) { - stream = segment_snap.stable->getInputStream(dm_context, read_info.read_columns, read_range, filter, max_version, false); + stream = segment_snap->stable->getInputStream(dm_context, read_info.read_columns, read_range, filter, max_version, false); } - else if (segment_snap.delta->rows == 0 && segment_snap.delta->deletes == 0 // - && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // - && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // + else if (segment_snap->delta->rows == 0 && segment_snap->delta->deletes == 0 // + && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // + && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // && !hasColumn(columns_to_read, TAG_COLUMN_ID)) { // No delta, let's try some optimizations. - stream = segment_snap.stable->getInputStream(dm_context, read_info.read_columns, read_range, filter, max_version, true); + stream = segment_snap->stable->getInputStream(dm_context, read_info.read_columns, read_range, filter, max_version, true); } else { @@ -294,8 +297,8 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, read_info.read_columns, read_range, filter, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -341,14 +344,16 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, size_t expected_block_size) { - SegmentSnapshot segment_snap = createSnapshot(dm_context); + auto segment_snap = createSnapshot(dm_context); + if (!segment_snap) + return {}; return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); } -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - SegmentSnapshot & segment_snap, - bool do_range_filter) +BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + bool do_range_filter) { ColumnDefines new_columns_to_read; @@ -367,10 +372,10 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, } } - BlockInputStreamPtr delta_stream = segment_snap.delta->prepareForStream(dm_context, new_columns_to_read); + BlockInputStreamPtr delta_stream = segment_snap->delta->prepareForStream(dm_context, new_columns_to_read); BlockInputStreamPtr stable_stream - = segment_snap.stable->getInputStream(dm_context, new_columns_to_read, range, EMPTY_FILTER, MAX_UINT64, false); + = segment_snap->stable->getInputStream(dm_context, new_columns_to_read, range, EMPTY_FILTER, MAX_UINT64, false); if (do_range_filter) { @@ -401,7 +406,9 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) { - SegmentSnapshot segment_snap = createSnapshot(dm_context); + auto segment_snap = createSnapshot(dm_context); + if (!segment_snap) + return {}; return getInputStreamRaw(dm_context, columns_to_read, segment_snap, true); } @@ -424,12 +431,12 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context) const return new_segment; } -StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const +StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const { LOG_DEBUG(log, "Segment [" << DB::toString(segment_id) - << "] prepare merge delta start. delta packs: " << DB::toString(segment_snap.delta->getPackCount()) - << ", delta total rows: " << DB::toString(segment_snap.delta->getRows())); + << "] prepare merge delta start. delta packs: " << DB::toString(segment_snap->delta->getPackCount()) + << ", delta total rows: " << DB::toString(segment_snap->delta->getRows())); EventRecorder recorder(ProfileEvents::DMDeltaMerge, ProfileEvents::DMDeltaMergeNS); @@ -439,8 +446,8 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, SegmentSn read_info.read_columns, range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -451,7 +458,7 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, SegmentSn data_stream = std::make_shared>( data_stream, read_info.read_columns, dm_context.min_version); - auto new_stable = createNewStable(dm_context, data_stream, segment_snap.stable->getId(), wbs); + auto new_stable = createNewStable(dm_context, data_stream, segment_snap->stable->getId(), wbs); LOG_DEBUG(log, "Segment [" << DB::toString(segment_id) << "] prepare merge delta done."); @@ -459,13 +466,13 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, SegmentSn } SegmentPtr Segment::applyMergeDelta(DMContext & context, - SegmentSnapshot & segment_snap, + const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, const StableValueSpacePtr & new_stable) const { LOG_DEBUG(log, "Before apply merge delta: " << info()); - auto later_packs = delta->checkHeadAndCloneTail(context, range, segment_snap.delta->packs, wbs); + auto later_packs = delta->checkHeadAndCloneTail(context, range, segment_snap->delta->packs, wbs); // Created references to tail pages' pages in "log" storage, we need to write them down. wbs.writeLogAndData(); @@ -578,7 +585,7 @@ Handle Segment::getSplitPointFast(DMContext & dm_context, const StableValueSpace return block.getByPosition(0).column->getInt(read_row_in_pack); } -Handle Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, SegmentSnapshot & segment_snap) const +Handle Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, const SegmentSnapshotPtr & segment_snap) const { EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS); @@ -590,8 +597,8 @@ Handle Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_ {handle}, range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -610,8 +617,8 @@ Handle Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_ {handle}, range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -646,15 +653,15 @@ Handle Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_ return split_handle; } -Segment::SplitInfo Segment::prepareSplit(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const +Segment::SplitInfo Segment::prepareSplit(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const { - if (!dm_context.enable_logical_split // - || segment_snap.stable->getPacks() <= 3 // - || segment_snap.delta->getRows() > segment_snap.stable->getRows()) + if (!dm_context.enable_logical_split // + || segment_snap->stable->getPacks() <= 3 // + || segment_snap->delta->getRows() > segment_snap->stable->getRows()) return prepareSplitPhysical(dm_context, segment_snap, wbs); else { - Handle split_point = getSplitPointFast(dm_context, segment_snap.stable); + Handle split_point = getSplitPointFast(dm_context, segment_snap->stable); bool bad_split_point = !range.check(split_point) || split_point == range.start; if (bad_split_point) return prepareSplitPhysical(dm_context, segment_snap, wbs); @@ -664,7 +671,7 @@ Segment::SplitInfo Segment::prepareSplit(DMContext & dm_context, SegmentSnapshot } Segment::SplitInfo -Segment::prepareSplitLogical(DMContext & dm_context, SegmentSnapshot & segment_snap, Handle split_point, WriteBatches & wbs) const +Segment::prepareSplitLogical(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, Handle split_point, WriteBatches & wbs) const { LOG_DEBUG(log, "Segment [" << segment_id << "] prepare split logical start"); @@ -684,10 +691,10 @@ Segment::prepareSplitLogical(DMContext & dm_context, SegmentSnapshot & segment_s DMFiles my_stable_files; DMFiles other_stable_files; - for (auto & dmfile : segment_snap.stable->getDMFiles()) + for (auto & dmfile : segment_snap->stable->getDMFiles()) { auto ori_ref_id = dmfile->refId(); - auto file_id = segment_snap.delta->storage_snap->data_reader.getNormalPageId(ori_ref_id); + auto file_id = segment_snap->delta->storage_snap->data_reader.getNormalPageId(ori_ref_id); auto file_parent_path = dm_context.extra_paths.getPath(file_id) + "/" + STABLE_FOLDER_NAME; auto my_dmfile_id = storage_pool.newDataPageId(); @@ -706,7 +713,7 @@ Segment::prepareSplitLogical(DMContext & dm_context, SegmentSnapshot & segment_s auto other_stable_id = storage_pool.newMetaPageId(); - auto my_stable = std::make_shared(segment_snap.stable->getId()); + auto my_stable = std::make_shared(segment_snap->stable->getId()); auto other_stable = std::make_shared(other_stable_id); my_stable->setFiles(my_stable_files, &dm_context, my_range); @@ -717,7 +724,7 @@ Segment::prepareSplitLogical(DMContext & dm_context, SegmentSnapshot & segment_s return {true, split_point, my_stable, other_stable}; } -Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const +Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const { LOG_DEBUG(log, "Segment [" << segment_id << "] prepare split physical start"); @@ -742,8 +749,8 @@ Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, Segment read_info.read_columns, my_range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -755,7 +762,7 @@ Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, Segment my_data = std::make_shared(my_data, EXTRA_HANDLE_COLUMN_NAME); my_data = std::make_shared>( my_data, read_info.read_columns, dm_context.min_version); - auto my_stable_id = segment_snap.stable->getId(); + auto my_stable_id = segment_snap->stable->getId(); my_new_stable = createNewStable(dm_context, my_data, my_stable_id, wbs); } @@ -767,8 +774,8 @@ Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, Segment read_info.read_columns, other_range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -799,10 +806,10 @@ Segment::SplitInfo Segment::prepareSplitPhysical(DMContext & dm_context, Segment return {false, split_point, my_new_stable, other_stable}; } -SegmentPair Segment::applySplit(DMContext & dm_context, // - SegmentSnapshot & segment_snap, - WriteBatches & wbs, - SplitInfo & split_info) const +SegmentPair Segment::applySplit(DMContext & dm_context, // + const SegmentSnapshotPtr & segment_snap, + WriteBatches & wbs, + SplitInfo & split_info) const { LOG_DEBUG(log, "Segment [" << segment_id << "] apply split"); @@ -810,7 +817,7 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // HandleRange other_range = {split_info.split_point, range.end}; Packs empty_packs; - Packs * head_packs = split_info.is_logical ? &empty_packs : &segment_snap.delta->packs; + Packs * head_packs = split_info.is_logical ? &empty_packs : &segment_snap->delta->packs; auto my_delta_packs = delta->checkHeadAndCloneTail(dm_context, my_range, *head_packs, wbs); auto other_delta_packs = delta->checkHeadAndCloneTail(dm_context, other_range, *head_packs, wbs); @@ -879,12 +886,12 @@ SegmentPtr Segment::merge(DMContext & dm_context, const SegmentPtr & left, const return merged; } -StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // - const SegmentPtr & left, - SegmentSnapshot & left_snap, - const SegmentPtr & right, - SegmentSnapshot & right_snap, - WriteBatches & wbs) +StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // + const SegmentPtr & left, + const SegmentSnapshotPtr & left_snap, + const SegmentPtr & right, + const SegmentSnapshotPtr & right_snap, + WriteBatches & wbs) { LOG_DEBUG(left->log, "Segment [" << left->segmentId() << "] and [" << right->segmentId() << "] prepare merge start"); @@ -892,14 +899,14 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // throw Exception("The ranges of merge segments are not consecutive: first end: " + DB::toString(left->range.end) + ", second start: " + DB::toString(right->range.start)); - auto getStream = [&](const SegmentPtr & segment, SegmentSnapshot & segment_snap) { + auto getStream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) { auto read_info = segment->getReadInfo(dm_context, *dm_context.store_columns, segment_snap); BlockInputStreamPtr stream = segment->getPlacedStream(dm_context, read_info.read_columns, segment->range, EMPTY_FILTER, - segment_snap.stable, - segment_snap.delta, + segment_snap->stable, + segment_snap->delta, read_info.index_begin, read_info.index_end, read_info.index->entryCount(), @@ -928,9 +935,9 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // SegmentPtr Segment::applyMerge(DMContext & dm_context, // const SegmentPtr & left, - SegmentSnapshot & left_snap, + const SegmentSnapshotPtr & left_snap, const SegmentPtr & right, - SegmentSnapshot & right_snap, + const SegmentSnapshotPtr & right_snap, WriteBatches & wbs, const StableValueSpacePtr & merged_stable) { @@ -938,8 +945,8 @@ SegmentPtr Segment::applyMerge(DMContext & dm_context, // HandleRange merged_range = {left->range.start, right->range.end}; - auto left_tail_packs = left->delta->checkHeadAndCloneTail(dm_context, merged_range, left_snap.delta->packs, wbs); - auto right_tail_packs = right->delta->checkHeadAndCloneTail(dm_context, merged_range, right_snap.delta->packs, wbs); + auto left_tail_packs = left->delta->checkHeadAndCloneTail(dm_context, merged_range, left_snap->delta->packs, wbs); + auto right_tail_packs = right->delta->checkHeadAndCloneTail(dm_context, merged_range, right_snap->delta->packs, wbs); // Created references to tail pages' pages in "log" storage, we need to write them down. wbs.writeLogAndData(); @@ -1032,14 +1039,14 @@ String Segment::info() const template Segment::ReadInfo -Segment::getReadInfo(const DMContext & dm_context, const ColumnDefines & read_columns, SegmentSnapshot & segment_snap) const +Segment::getReadInfo(const DMContext & dm_context, const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap) const { LOG_DEBUG(log, "getReadInfo start"); auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(), read_columns); - segment_snap.delta->prepare(dm_context, new_read_columns); + segment_snap->delta->prepare(dm_context, new_read_columns); - DeltaIndexPtr delta_index = ensurePlace(dm_context, segment_snap.stable, segment_snap.delta); + DeltaIndexPtr delta_index = ensurePlace(dm_context, segment_snap->stable, segment_snap->delta); auto index_begin = DeltaIndex::begin(delta_index); auto index_end = DeltaIndex::end(delta_index); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index cc7509d9f12..b9d689b9626 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -18,6 +18,7 @@ namespace DM class Segment; struct SegmentSnapshot; +using SegmentSnapshotPtr = std::shared_ptr; class StableValueSpace; using StableValueSpacePtr = std::shared_ptr; class DeltaValueSpace; @@ -28,14 +29,12 @@ using SegmentPair = std::pair; using Segments = std::vector; /// A structure stores the informations to constantly read a segment instance. -struct SegmentSnapshot +struct SegmentSnapshot : private boost::noncopyable { DeltaSnapshotPtr delta; StableValueSpacePtr stable; - SegmentSnapshot() = default; - - explicit operator bool() { return (bool)delta; } + SegmentSnapshot(const DeltaSnapshotPtr & delta_, const StableValueSpacePtr & stable_) : delta(delta_), stable(stable_) {} }; /// A segment contains many rows of a table. A table is split into segments by consecutive ranges. @@ -101,15 +100,15 @@ class Segment : private boost::noncopyable bool write(DMContext & dm_context, const Block & block); // For test only bool write(DMContext & dm_context, const HandleRange & delete_range); - SegmentSnapshot createSnapshot(const DMContext & dm_context, bool is_update = false) const; + SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool is_update = false) const; - BlockInputStreamPtr getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - SegmentSnapshot & segment_snap, - const HandleRanges & read_ranges, - const RSOperatorPtr & filter, - UInt64 max_version, - size_t expected_block_size); + BlockInputStreamPtr getInputStream(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const HandleRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); BlockInputStreamPtr getInputStream(const DMContext & dm_context, const ColumnDefines & columns_to_read, @@ -118,10 +117,10 @@ class Segment : private boost::noncopyable UInt64 max_version = MAX_UINT64, size_t expected_block_size = DEFAULT_BLOCK_SIZE); - BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - SegmentSnapshot & segment_snap, - bool do_range_filter); + BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + bool do_range_filter); BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read); @@ -129,28 +128,29 @@ class Segment : private boost::noncopyable /// split(), merge() and mergeDelta() are only used in test cases. SegmentPair split(DMContext & dm_context) const; - SplitInfo prepareSplit(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const; - SegmentPair applySplit(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs, SplitInfo & split_info) const; + SplitInfo prepareSplit(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const; + SegmentPair + applySplit(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, SplitInfo & split_info) const; static SegmentPtr merge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right); - static StableValueSpacePtr prepareMerge(DMContext & dm_context, // - const SegmentPtr & left, - SegmentSnapshot & left_snap, - const SegmentPtr & right, - SegmentSnapshot & right_snap, - WriteBatches & wbs); + static StableValueSpacePtr prepareMerge(DMContext & dm_context, // + const SegmentPtr & left, + const SegmentSnapshotPtr & left_snap, + const SegmentPtr & right, + const SegmentSnapshotPtr & right_snap, + WriteBatches & wbs); static SegmentPtr applyMerge(DMContext & dm_context, // const SegmentPtr & left, - SegmentSnapshot & left_snap, + const SegmentSnapshotPtr & left_snap, const SegmentPtr & right, - SegmentSnapshot & right_snap, + const SegmentSnapshotPtr & right_snap, WriteBatches & wbs, const StableValueSpacePtr & merged_stable); SegmentPtr mergeDelta(DMContext & dm_context) const; - StableValueSpacePtr prepareMergeDelta(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const; + StableValueSpacePtr prepareMergeDelta(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const; SegmentPtr applyMergeDelta(DMContext & dm_context, - SegmentSnapshot & segment_snap, + const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs, const StableValueSpacePtr & new_stable) const; @@ -200,7 +200,7 @@ class Segment : private boost::noncopyable private: template - ReadInfo getReadInfo(const DMContext & dm_context, const ColumnDefines & read_columns, SegmentSnapshot & segment_snap) const; + ReadInfo getReadInfo(const DMContext & dm_context, const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap) const; template static ColumnDefines arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); @@ -218,12 +218,15 @@ class Segment : private boost::noncopyable size_t expected_block_size) const; /// Merge delta & stable, and then take the middle one. - Handle getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, SegmentSnapshot & segment_snap) const; + Handle getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, const SegmentSnapshotPtr & segment_snap) const; /// Only look up in the stable vs. Handle getSplitPointFast(DMContext & dm_context, const StableValueSpacePtr & stable_snap) const; - SplitInfo prepareSplitLogical(DMContext & dm_context, SegmentSnapshot & segment_snap, Handle split_point, WriteBatches & wbs) const; - SplitInfo prepareSplitPhysical(DMContext & dm_context, SegmentSnapshot & segment_snap, WriteBatches & wbs) const; + SplitInfo prepareSplitLogical(DMContext & dm_context, // + const SegmentSnapshotPtr & segment_snap, + Handle split_point, + WriteBatches & wbs) const; + SplitInfo prepareSplitPhysical(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const; /// Make sure that all delta packs have been placed. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c4c2e437b04..ac26cad9337 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -11,18 +11,18 @@ namespace DM struct SegmentReadTask { - SegmentPtr segment; - SegmentSnapshot read_snapshot; - HandleRanges ranges; + SegmentPtr segment; + SegmentSnapshotPtr read_snapshot; + HandleRanges ranges; - explicit SegmentReadTask(const SegmentPtr & segment_, const SegmentSnapshot & read_snapshot_) + explicit SegmentReadTask(const SegmentPtr & segment_, const SegmentSnapshotPtr & read_snapshot_) : segment(segment_), read_snapshot(read_snapshot_) { } - SegmentReadTask(const SegmentPtr & segment_, // - const SegmentSnapshot & read_snapshot_, - const HandleRanges & ranges_) + SegmentReadTask(const SegmentPtr & segment_, // + const SegmentSnapshotPtr & read_snapshot_, + const HandleRanges & ranges_) : segment(segment_), read_snapshot(read_snapshot_), ranges(ranges_) { }