diff --git a/.gitignore b/.gitignore index 5dcfbeea25d..77fad881eb3 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,10 @@ # vscode clangd cache .cache +# JSON Compilation Database Format Specification +# https://clang.llvm.org/docs/JSONCompilationDatabase.html +compile_commands.json + # git patch reject report *.rej diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 7601314e9d1..0f17111cd03 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -58,7 +58,8 @@ std::unordered_map> FailPointHelper::f M(random_slow_page_storage_remove_expired_snapshots) \ M(random_slow_page_storage_list_all_live_files) \ M(force_set_safepoint_when_decode_block) \ - M(force_set_page_data_compact_batch) + M(force_set_page_data_compact_batch) \ + M(force_set_dtfile_exist_when_acquire_id) #define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \ M(pause_after_learner_read) \ diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 4f1578ebda6..adba63e6554 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -163,10 +163,11 @@ DeltaPacks DeltaValueSpace::checkHeadAndCloneTail(DMContext & context, } else if (auto f = pack->tryToFile(); f) { - auto new_ref_id = context.storage_pool.newDataPageId(); - auto file_id = f->getFile()->fileId(); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto file_id = f->getFile()->fileId(); wbs.data.putRefPage(new_ref_id, file_id); - auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id); + auto file_parent_path = delegator.getDTFilePath(file_id); auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path); auto new_pack = f->cloneWith(context, new_file, target_range); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 47f7e77b562..da6515fa9d3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -571,7 +571,7 @@ std::tuple DeltaMergeStore::preAllocateIngestFile() auto delegator = path_pool.getStableDiskDelegator(); auto parent_path = delegator.choosePath(); - auto new_id = storage_pool.newDataPageId(); + auto new_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); return {parent_path, new_id}; } @@ -679,7 +679,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, /// Generate DMFile instance with a new ref_id pointed to the file_id. auto file_id = file->fileId(); auto & file_parent_path = file->parentPath(); - auto ref_id = storage_pool.newDataPageId(); + auto ref_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path); auto pack = std::make_shared(*dm_context, ref_file, segment_range); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8dd20040609..f1f1d0553cb 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,7 @@ #include #include #include +#include #include #include @@ -110,19 +112,19 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // // When the input_stream is not mvcc, we assume the rows in this input_stream is most valid and make it not tend to be gc. size_t cur_effective_num_rows = block.rows(); - size_t cur_not_clean_rows = 1; - size_t gc_hint_version = UINT64_MAX; + size_t cur_not_clean_rows = 1; + size_t gc_hint_version = UINT64_MAX; if (mvcc_stream) { cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); - cur_not_clean_rows = mvcc_stream->getNotCleanRows(); - gc_hint_version = mvcc_stream->getGCHintVersion(); + cur_not_clean_rows = mvcc_stream->getNotCleanRows(); + gc_hint_version = mvcc_stream->getGCHintVersion(); } DMFileBlockOutputStream::BlockProperty block_property; block_property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows; - block_property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows; - block_property.gc_hint_version = gc_hint_version; + block_property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows; + block_property.gc_hint_version = gc_hint_version; output_stream->write(block, block_property); } @@ -138,19 +140,19 @@ StableValueSpacePtr createNewStable(DMContext & context, PageId stable_id, WriteBatches & wbs) { - auto delegate = context.path_pool.getStableDiskDelegator(); - auto store_path = delegate.choosePath(); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto store_path = delegator.choosePath(); DMFileBlockOutputStream::Flags flags; flags.setSingleFile(context.db_context.getSettingsRef().dt_enable_single_file_mode_dmfile); - PageId dmfile_id = context.storage_pool.newDataPageId(); - auto dmfile = writeIntoNewDMFile(context, schema_snap, input_stream, dmfile_id, store_path, flags); - auto stable = std::make_shared(stable_id); - stable->setFiles({dmfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size)); + PageId dtfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto dtfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path, flags); + auto stable = std::make_shared(stable_id); + stable->setFiles({dtfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size)); stable->saveMeta(wbs.meta); - wbs.data.putExternal(dmfile_id, 0); - delegate.addDTFile(dmfile_id, dmfile->getBytesOnDisk(), store_path); + wbs.data.putExternal(dtfile_id, 0); + delegator.addDTFile(dtfile_id, dtfile->getBytesOnDisk(), store_path); return stable; } @@ -174,8 +176,7 @@ Segment::Segment(UInt64 epoch_, // delta(delta_), stable(stable_), log(&Logger::get("Segment")) -{ -} +{} SegmentPtr Segment::newSegment(DMContext & context, const ColumnDefinesPtr & schema, @@ -187,7 +188,7 @@ SegmentPtr Segment::newSegment(DMContext & context, { WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); - auto delta = std::make_shared(delta_id); + auto delta = std::make_shared(delta_id); auto stable = createNewStable(context, schema, std::make_shared(*schema), stable_id, wbs); auto segment = std::make_shared(INITIAL_EPOCH, range, segment_id, next_segment_id, delta, stable); @@ -207,51 +208,51 @@ SegmentPtr Segment::newSegment( DMContext & context, const ColumnDefinesPtr & schema, const RowKeyRange & rowkey_range, PageId segment_id, PageId next_segment_id) { return newSegment(context, - schema, - rowkey_range, - segment_id, - next_segment_id, - context.storage_pool.newMetaPageId(), - context.storage_pool.newMetaPageId()); + schema, + rowkey_range, + segment_id, + next_segment_id, + context.storage_pool.newMetaPageId(), + context.storage_pool.newMetaPageId()); } SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) { - Page page = context.storage_pool.meta().read(segment_id, nullptr); // not limit restore + Page page = context.storage_pool.meta().read(segment_id, nullptr); // not limit restore - ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); SegmentFormat::Version version; readIntBinary(version, buf); - UInt64 epoch; + UInt64 epoch; RowKeyRange rowkey_range; - PageId next_segment_id, delta_id, stable_id; + PageId next_segment_id, delta_id, stable_id; readIntBinary(epoch, buf); switch (version) { case SegmentFormat::V1: { - HandleRange range; - readIntBinary(range.start, buf); - readIntBinary(range.end, buf); - rowkey_range = RowKeyRange::fromHandleRange(range); - break; - } + HandleRange range; + readIntBinary(range.start, buf); + readIntBinary(range.end, buf); + rowkey_range = RowKeyRange::fromHandleRange(range); + break; + } case SegmentFormat::V2: { - rowkey_range = RowKeyRange::deserialize(buf); - break; - } - default: - throw Exception("Illegal version" + DB::toString(version), ErrorCodes::LOGICAL_ERROR); + rowkey_range = RowKeyRange::deserialize(buf); + break; + } + default: + throw Exception("Illegal version" + DB::toString(version), ErrorCodes::LOGICAL_ERROR); } readIntBinary(next_segment_id, buf); readIntBinary(delta_id, buf); readIntBinary(stable_id, buf); - auto delta = DeltaValueSpace::restore(context, rowkey_range, delta_id); - auto stable = StableValueSpace::restore(context, stable_id); + auto delta = DeltaValueSpace::restore(context, rowkey_range, delta_id); + auto stable = StableValueSpace::restore(context, stable_id); auto segment = std::make_shared(epoch, rowkey_range, segment_id, next_segment_id, delta, stable); return segment; @@ -329,7 +330,7 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo { // If the snapshot is created for read, then the snapshot will contain all packs (cached and persisted) for read. // If the snapshot is created for update, then the snapshot will only contain the persisted packs. - auto delta_snap = delta->createSnapshot(dm_context, for_update, metric); + auto delta_snap = delta->createSnapshot(dm_context, for_update, metric); auto stable_snap = stable->createSnapshot(); if (!delta_snap || !stable_snap) return {}; @@ -362,7 +363,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex else if (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0 // && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // - && !hasColumn(columns_to_read, TAG_COLUMN_ID)) + && !hasColumn(columns_to_read, TAG_COLUMN_ID)) { // No delta, let's try some optimizations. stream = segment_snap->stable->getInputStream( @@ -371,15 +372,15 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex else { stream = getPlacedStream(dm_context, - *read_info.read_columns, - read_range, - filter, - segment_snap->stable, - read_info.getDeltaReader(), - read_info.index_begin, - read_info.index_end, - expected_block_size, - max_version); + *read_info.read_columns, + read_range, + filter, + segment_snap->stable, + read_info.getDeltaReader(), + read_info.index_begin, + read_info.index_end, + expected_block_size, + max_version); } stream = std::make_shared>(stream, read_range, 0); @@ -393,8 +394,8 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex if (read_ranges.size() == 1) { LOG_TRACE(log, - "Segment [" << segment_id << "] is read by max_version: " << max_version << ", 1" - << " range: " << DB::DM::toDebugString(read_ranges)); + "Segment [" << segment_id << "] is read by max_version: " << max_version << ", 1" + << " range: " << DB::DM::toDebugString(read_ranges)); RowKeyRange real_range = rowkey_range.shrink(read_ranges[0]); if (real_range.none()) stream = std::make_shared(toEmptyBlock(*read_info.read_columns)); @@ -412,8 +413,8 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_contex } LOG_TRACE(log, - "Segment [" << segment_id << "] is read by max_version: " << max_version << ", " << streams.size() - << " ranges: " << DB::DM::toDebugString(read_ranges)); + "Segment [" << segment_id << "] is read by max_version: " << max_version << ", " << streams.size() + << " ranges: " << DB::DM::toDebugString(read_ranges)); if (streams.empty()) stream = std::make_shared(toEmptyBlock(*read_info.read_columns)); @@ -446,14 +447,14 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, {data_range}); BlockInputStreamPtr data_stream = getPlacedStream(dm_context, - *read_info.read_columns, - data_range, - EMPTY_FILTER, - segment_snap->stable, - read_info.getDeltaReader(), - read_info.index_begin, - read_info.index_end, - expected_block_size); + *read_info.read_columns, + data_range, + EMPTY_FILTER, + segment_snap->stable, + read_info.getDeltaReader(), + read_info.index_begin, + read_info.index_end, + expected_block_size); data_stream = std::make_shared>(data_stream, data_range, 0); @@ -492,9 +493,9 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_con BlockInputStreamPtr delta_stream = std::make_shared(dm_context, // - segment_snap->delta, - new_columns_to_read, - this->rowkey_range); + segment_snap->delta, + new_columns_to_read, + this->rowkey_range); BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, rowkey_range, EMPTY_FILTER, MAX_UINT64, expected_block_size, false); @@ -537,7 +538,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, con SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); - auto segment_snap = createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + auto segment_snap = createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfDeltaMerge); if (!segment_snap) return {}; @@ -546,7 +547,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & wbs.writeLogAndData(); new_stable->enableDMFilesGC(); - auto lock = mustGetUpdateLock(); + auto lock = mustGetUpdateLock(); auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable); wbs.writeAll(); @@ -559,9 +560,9 @@ StableValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_con WriteBatches & wbs) const { LOG_INFO(log, - "Segment [" << DB::toString(segment_id) - << "] prepare merge delta start. delta packs: " << DB::toString(segment_snap->delta->getPackCount()) - << ", delta total rows: " << DB::toString(segment_snap->delta->getRows())); + "Segment [" << DB::toString(segment_id) + << "] prepare merge delta start. delta packs: " << DB::toString(segment_snap->delta->getPackCount()) + << ", delta total rows: " << DB::toString(segment_snap->delta->getRows())); EventRecorder recorder(ProfileEvents::DMDeltaMerge, ProfileEvents::DMDeltaMergeNS); @@ -590,11 +591,11 @@ SegmentPtr Segment::applyMergeDelta(DMContext & context, new_delta->saveMeta(wbs); auto new_me = std::make_shared(epoch + 1, // - rowkey_range, - segment_id, - next_segment_id, - new_delta, - new_stable); + rowkey_range, + segment_id, + next_segment_id, + new_delta, + new_stable); // avoid recheck whether to do DeltaMerge using the same gc_safe_point new_me->setLastCheckGCSafePoint(context.min_version); @@ -615,7 +616,7 @@ SegmentPtr Segment::applyMergeDelta(DMContext & context, SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); - auto segment_snap = createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentSplit); + auto segment_snap = createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentSplit); if (!segment_snap) return {}; @@ -629,7 +630,7 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche split_info.my_stable->enableDMFilesGC(); split_info.other_stable->enableDMFilesGC(); - auto lock = mustGetUpdateLock(); + auto lock = mustGetUpdateLock(); auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info); wbs.writeAll(); @@ -642,7 +643,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co // FIXME: this method does not consider invalid packs in stable dmfiles. EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS); - auto stable_rows = stable_snap->getRows(); + auto stable_rows = stable_snap->getRows(); if (unlikely(!stable_rows)) throw Exception("No stable rows"); @@ -651,14 +652,14 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co auto & dmfiles = stable_snap->getDMFiles(); DMFilePtr read_file; - size_t file_index = 0; - auto read_pack = std::make_shared(); - size_t read_row_in_pack = 0; + size_t file_index = 0; + auto read_pack = std::make_shared(); + size_t read_row_in_pack = 0; size_t cur_rows = 0; for (size_t index = 0; index < dmfiles.size(); index++) { - auto & file = dmfiles[index]; + auto & file = dmfiles[index]; size_t rows_in_file = file->getRows(); cur_rows += rows_in_file; if (cur_rows > split_row_index) @@ -672,7 +673,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co { cur_rows -= pack_stats[pack_id].rows; - read_file = file; + read_file = file; file_index = index; read_pack->insert(pack_id); read_row_in_pack = split_row_index - cur_rows; @@ -687,15 +688,15 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co throw Exception("Logical error: failed to find split point"); DMFileBlockInputStream stream(dm_context.db_context, - MAX_UINT64, - false, - dm_context.hash_salt, - read_file, - {getExtraHandleColumnDefine(is_common_handle)}, - RowKeyRange::newAll(is_common_handle, rowkey_column_size), - EMPTY_FILTER, - stable_snap->getColumnCaches()[file_index], - read_pack); + MAX_UINT64, + false, + dm_context.hash_salt, + read_file, + {getExtraHandleColumnDefine(is_common_handle)}, + RowKeyRange::newAll(is_common_handle, rowkey_column_size), + EMPTY_FILTER, + stable_snap->getColumnCaches()[file_index], + read_pack); stream.readPrefix(); auto block = stream.read(); @@ -704,7 +705,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co stream.readSuffix(); RowKeyColumnContainer rowkey_column(block.getByPosition(0).column, is_common_handle); - RowKeyValue split_point(rowkey_column.getRowKeyValue(read_row_in_pack)); + RowKeyValue split_point(rowkey_column.getRowKeyValue(read_row_in_pack)); if (!rowkey_range.check(split_point.toRowKeyValueRef()) @@ -726,8 +727,8 @@ Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, c { EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS); - auto & pk_col = getExtraHandleColumnDefine(is_common_handle); - auto pk_col_defs = std::make_shared(ColumnDefines{pk_col}); + auto & pk_col = getExtraHandleColumnDefine(is_common_handle); + auto pk_col_defs = std::make_shared(ColumnDefines{pk_col}); // We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column. auto delta_reader = read_info.getDeltaReader(pk_col_defs); @@ -735,14 +736,14 @@ Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, c { BlockInputStreamPtr stream = getPlacedStream(dm_context, - *pk_col_defs, - rowkey_range, - EMPTY_FILTER, - segment_snap->stable, - delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows); + *pk_col_defs, + rowkey_range, + EMPTY_FILTER, + segment_snap->stable, + delta_reader, + read_info.index_begin, + read_info.index_end, + dm_context.stable_pack_rows); stream = std::make_shared>(stream, rowkey_range, 0); @@ -760,14 +761,14 @@ Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, c } BlockInputStreamPtr stream = getPlacedStream(dm_context, - *pk_col_defs, - rowkey_range, - EMPTY_FILTER, - segment_snap->stable, - delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows); + *pk_col_defs, + rowkey_range, + EMPTY_FILTER, + segment_snap->stable, + delta_reader, + read_info.index_begin, + read_info.index_end, + dm_context.stable_pack_rows); stream = std::make_shared>(stream, rowkey_range, 0); @@ -799,7 +800,7 @@ Segment::getSplitPointSlow(DMContext & dm_context, const ReadInfo & read_info, c LOG_WARNING(log, __FUNCTION__ << " unexpected split_handle: " << split_point.toRowKeyValueRef().toDebugString() << ", should be in range " << rowkey_range.toDebugString() << ", exact_rows: " << DB::toString(exact_rows) - << ", cur count: " << DB::toString(count) << ", split_row_index: " << split_row_index); + << ", cur count: " << DB::toString(count) << ", split_row_index: " << split_row_index); return {}; } @@ -854,8 +855,8 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co if (my_range.none() || other_range.none()) { LOG_WARNING(log, - __FUNCTION__ << ": unexpected range! my_range: " << my_range.toDebugString() - << ", other_range: " << other_range.toDebugString() << ", aborted"); + __FUNCTION__ << ": unexpected range! my_range: " << my_range.toDebugString() << ", other_range: " << other_range.toDebugString() + << ", aborted"); return {}; } @@ -867,12 +868,12 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co auto delegate = dm_context.path_pool.getStableDiskDelegator(); for (auto & dmfile : segment_snap->stable->getDMFiles()) { - auto ori_ref_id = dmfile->refId(); - auto file_id = dmfile->fileId(); + auto ori_ref_id = dmfile->refId(); + auto file_id = dmfile->fileId(); auto file_parent_path = delegate.getDTFilePath(file_id); - auto my_dmfile_id = storage_pool.newDataPageId(); - auto other_dmfile_id = storage_pool.newDataPageId(); + auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); wbs.data.putRefPage(my_dmfile_id, file_id); wbs.data.putRefPage(other_dmfile_id, file_id); @@ -888,7 +889,7 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co auto other_stable_id = storage_pool.newMetaPageId(); - auto my_stable = std::make_shared(segment_snap->stable->getId()); + auto my_stable = std::make_shared(segment_snap->stable->getId()); auto other_stable = std::make_shared(other_stable_id); my_stable->setFiles(my_stable_files, my_range, &dm_context); @@ -936,14 +937,14 @@ std::optional Segment::prepareSplitPhysical(DMContext & auto my_delta_reader = read_info.getDeltaReader(schema_snap); BlockInputStreamPtr my_data = getPlacedStream(dm_context, - *read_info.read_columns, - my_range, - EMPTY_FILTER, - segment_snap->stable, - my_delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows); + *read_info.read_columns, + my_range, + EMPTY_FILTER, + segment_snap->stable, + my_delta_reader, + read_info.index_begin, + read_info.index_end, + dm_context.stable_pack_rows); my_data = std::make_shared>(my_data, my_range, 0); @@ -963,14 +964,14 @@ std::optional Segment::prepareSplitPhysical(DMContext & auto other_delta_reader = read_info.getDeltaReader(schema_snap); BlockInputStreamPtr other_data = getPlacedStream(dm_context, - *read_info.read_columns, - other_range, - EMPTY_FILTER, - segment_snap->stable, - other_delta_reader, - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows); + *read_info.read_columns, + other_range, + EMPTY_FILTER, + segment_snap->stable, + other_delta_reader, + read_info.index_begin, + read_info.index_end, + dm_context.stable_pack_rows); other_data = std::make_shared>(other_data, other_range, 0); @@ -1021,18 +1022,18 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // auto other_delta = std::make_shared(other_delta_id, other_delta_packs); auto new_me = std::make_shared(this->epoch + 1, // - my_range, - this->segment_id, - other_segment_id, - my_delta, - split_info.my_stable); + my_range, + this->segment_id, + other_segment_id, + my_delta, + split_info.my_stable); auto other = std::make_shared(INITIAL_EPOCH, // - other_range, - other_segment_id, - this->next_segment_id, - other_delta, - split_info.other_stable); + other_range, + other_segment_id, + this->next_segment_id, + other_delta, + split_info.other_stable); new_me->delta->saveMeta(wbs); new_me->stable->saveMeta(wbs.meta); @@ -1087,20 +1088,20 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, if (unlikely(compare(left->rowkey_range.getEnd(), right->rowkey_range.getStart()) != 0 || left->next_segment_id != right->segment_id)) throw Exception("The ranges of merge segments are not consecutive: first end: " + left->rowkey_range.getEnd().toDebugString() - + ", second start: " + right->rowkey_range.getStart().toDebugString()); + + ", second start: " + right->rowkey_range.getStart().toDebugString()); auto getStream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) { auto read_info = segment->getReadInfo( dm_context, *schema_snap, segment_snap, {RowKeyRange::newAll(left->is_common_handle, left->rowkey_column_size)}); BlockInputStreamPtr stream = getPlacedStream(dm_context, - *read_info.read_columns, - segment->rowkey_range, - EMPTY_FILTER, - segment_snap->stable, - read_info.getDeltaReader(), - read_info.index_begin, - read_info.index_end, - dm_context.stable_pack_rows); + *read_info.read_columns, + segment->rowkey_range, + EMPTY_FILTER, + segment_snap->stable, + read_info.getDeltaReader(), + read_info.index_begin, + read_info.index_end, + dm_context.stable_pack_rows); stream = std::make_shared>(stream, segment->rowkey_range, 0); stream = std::make_shared>(stream, EXTRA_HANDLE_COLUMN_ID, dm_context.is_common_handle); @@ -1161,11 +1162,11 @@ SegmentPtr Segment::applyMerge(DMContext & dm_context, // auto merged_delta = std::make_shared(left->delta->getId(), merged_packs); auto merged = std::make_shared(left->epoch + 1, // - merged_range, - left->segment_id, - right->next_segment_id, - merged_delta, - merged_stable); + merged_range, + left->segment_id, + right->next_segment_id, + merged_delta, + merged_stable); // Store new meta data merged->delta->saveMeta(wbs); @@ -1218,9 +1219,9 @@ void Segment::placeDeltaIndex(DMContext & dm_context) if (!segment_snap) return; getReadInfo(dm_context, - /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, - segment_snap, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, + segment_snap, + {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); } String Segment::simpleInfo() const @@ -1393,9 +1394,9 @@ std::pair Segment::ensurePlace(const DMContext & my_delta_index->update(my_delta_tree, my_placed_rows, my_placed_deletes); LOG_DEBUG(log, - __FUNCTION__ << simpleInfo() << " read_ranges:" << DB::DM::toDebugString(read_ranges) << ", place item count:" << items.size() - << ", shared delta index: " << delta_snap->getSharedDeltaIndex()->toString() - << ", my delta index: " << my_delta_index->toString()); + __FUNCTION__ << simpleInfo() << " read_ranges:" << DB::DM::toDebugString(read_ranges) << ", place item count:" << items.size() + << ", shared delta index: " << delta_snap->getSharedDeltaIndex()->toString() + << ", my delta index: " << my_delta_index->toString()); return {my_delta_index, fully_indexed}; } diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index e97ddac8f2e..1b3a2472f78 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -1,16 +1,22 @@ +#include #include #include #include #include #include +#include namespace DB { +namespace FailPoints +{ +extern const char force_set_dtfile_exist_when_acquire_id[]; +} // namespace FailPoints namespace DM { enum class StorageType { - Log = 1, + Log = 1, Data = 2, Meta = 3, }; @@ -18,9 +24,9 @@ enum class StorageType PageStorage::Config extractConfig(const Settings & settings, StorageType subtype) { #define SET_CONFIG(NAME) \ - config.num_write_slots = settings.dt_storage_pool_##NAME##_write_slots; \ - config.gc_min_files = settings.dt_storage_pool_##NAME##_gc_min_file_num; \ - config.gc_min_bytes = settings.dt_storage_pool_##NAME##_gc_min_bytes; \ + config.num_write_slots = settings.dt_storage_pool_##NAME##_write_slots; \ + config.gc_min_files = settings.dt_storage_pool_##NAME##_gc_min_file_num; \ + config.gc_min_bytes = settings.dt_storage_pool_##NAME##_gc_min_bytes; \ config.gc_min_legacy_num = settings.dt_storage_pool_##NAME##_gc_min_legacy_num; \ config.gc_max_valid_rate = settings.dt_storage_pool_##NAME##_gc_max_valid_rate; @@ -28,17 +34,17 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype switch (subtype) { - case StorageType::Log: - SET_CONFIG(log); - break; - case StorageType::Data: - SET_CONFIG(data); - break; - case StorageType::Meta: - SET_CONFIG(meta); - break; - default: - throw Exception("Unknown subtype in extractConfig: " + DB::toString(static_cast(subtype))); + case StorageType::Log: + SET_CONFIG(log); + break; + case StorageType::Data: + SET_CONFIG(data); + break; + case StorageType::Meta: + SET_CONFIG(meta); + break; + default: + throw Exception("Unknown subtype in extractConfig: " + DB::toString(static_cast(subtype))); } #undef SET_CONFIG @@ -47,29 +53,23 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings) : // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible - log_storage(name + ".log", - path_pool.getPSDiskDelegatorMulti("log"), - extractConfig(settings, StorageType::Log), - global_ctx.getFileProvider(), - global_ctx.getTiFlashMetrics()), + log_storage( + name + ".log", path_pool.getPSDiskDelegatorMulti("log"), extractConfig(settings, StorageType::Log), global_ctx.getFileProvider()), // The iops in data_storage is low, only use the first disk for storing data data_storage(name + ".data", - path_pool.getPSDiskDelegatorSingle("data"), - extractConfig(settings, StorageType::Data), - global_ctx.getFileProvider(), - global_ctx.getTiFlashMetrics()), + path_pool.getPSDiskDelegatorSingle("data"), + extractConfig(settings, StorageType::Data), + global_ctx.getFileProvider()), // The iops in meta_storage is relatively high, use multi-disks if possible meta_storage(name + ".meta", - path_pool.getPSDiskDelegatorMulti("meta"), - extractConfig(settings, StorageType::Meta), - global_ctx.getFileProvider(), - global_ctx.getTiFlashMetrics()), + path_pool.getPSDiskDelegatorMulti("meta"), + extractConfig(settings, StorageType::Meta), + global_ctx.getFileProvider()), max_log_page_id(0), max_data_page_id(0), max_meta_page_id(0), global_context(global_ctx) -{ -} +{} void StoragePool::restore() { @@ -77,7 +77,7 @@ void StoragePool::restore() data_storage.restore(); meta_storage.restore(); - max_log_page_id = log_storage.getMaxId(); + max_log_page_id = log_storage.getMaxId(); max_data_page_id = data_storage.getMaxId(); max_meta_page_id = meta_storage.getMaxId(); } @@ -89,6 +89,36 @@ void StoragePool::drop() log_storage.drop(); } +PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who) +{ + // In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID. + // After TiFlash process restored, the ID will be inserted into the stable delegator, but we may + // get a duplicated ID from the `storage_pool.data`. (tics#2756) + PageId dtfile_id; + do + { + dtfile_id = ++max_data_page_id; + + auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false); + fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, { + static size_t fail_point_called = 0; + if (existed_path.empty() && fail_point_called % 10 == 0) + { + existed_path = ""; + } + fail_point_called++; + }); + if (likely(existed_path.empty())) + { + break; + } + // else there is a DTFile with that id, continue to acquire a new ID. + LOG_WARNING(&Poco::Logger::get(who), + fmt::format("The DTFile is already exists, continute to acquire another ID. [path={}] [id={}]", existed_path, dtfile_id)); + } while (true); + return dtfile_id; +} + bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_period) { { @@ -102,8 +132,8 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio } bool done_anything = false; - auto write_limiter = global_context.getWriteLimiter(); - auto read_limiter = global_context.getReadLimiter(); + auto write_limiter = global_context.getWriteLimiter(); + auto read_limiter = global_context.getReadLimiter(); // FIXME: The global_context.settings is mutable, we need a way to reload thses settings. // auto config = extractConfig(settings, StorageType::Meta); // meta_storage.reloadSettings(config); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index ba206c12afe..82b076be9d9 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -10,6 +10,7 @@ namespace DB struct Settings; class Context; class StoragePathPool; +class StableDiskDelegator; namespace DM { @@ -19,10 +20,10 @@ static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60); class StoragePool : private boost::noncopyable { public: - using Clock = std::chrono::system_clock; + using Clock = std::chrono::system_clock; using Timepoint = Clock::time_point; - using Duration = Clock::duration; - using Seconds = std::chrono::seconds; + using Duration = Clock::duration; + using Seconds = std::chrono::seconds; StoragePool(const String & name, StoragePathPool & path_pool, const Context & global_ctx, const Settings & settings); @@ -33,9 +34,10 @@ class StoragePool : private boost::noncopyable PageId maxMetaPageId() { return max_meta_page_id; } PageId newLogPageId() { return ++max_log_page_id; } - PageId newDataPageId() { return ++max_data_page_id; } PageId newMetaPageId() { return ++max_meta_page_id; } + PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); + PageStorage & log() { return log_storage; } PageStorage & data() { return data_storage; } PageStorage & meta() { return meta_storage; } @@ -58,7 +60,7 @@ class StoragePool : private boost::noncopyable std::mutex mutex; - const Context& global_context; + const Context & global_context; }; struct StorageSnapshot : private boost::noncopyable @@ -67,8 +69,7 @@ struct StorageSnapshot : private boost::noncopyable : log_reader(storage.log(), snapshot_read ? storage.log().getSnapshot() : nullptr, read_limiter), data_reader(storage.data(), snapshot_read ? storage.data().getSnapshot() : nullptr, read_limiter), meta_reader(storage.meta(), snapshot_read ? storage.meta().getSnapshot() : nullptr, read_limiter) - { - } + {} PageReader log_reader; PageReader data_reader; diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 5dd554ce39e..af3fd7e4f2f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -32,8 +32,7 @@ inline ::testing::AssertionResult HandleRangeCompare(const char * lhs_exp { if (lhs == rhs) return ::testing::AssertionSuccess(); - else - return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); } /// helper functions for comparing HandleRange inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_expr, @@ -43,8 +42,7 @@ inline ::testing::AssertionResult RowKeyRangeCompare(const char * lhs_exp { if (lhs == rhs) return ::testing::AssertionSuccess(); - else - return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); } #define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) #define ASSERT_ROWKEY_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::RowKeyRangeCompare, val1, val2) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 69f8ecbe45c..6e67b825c32 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -1323,7 +1323,7 @@ try /* max_version= */ tso1, EMPTY_FILTER, /* expected_block_size= */ 1024); - ASSERT_EQ(ins.size(), 1UL); + ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; size_t num_rows_read = 0; @@ -1345,7 +1345,7 @@ try num_rows_read += block.rows(); } in->readSuffix(); - EXPECT_EQ(num_rows_read, 32UL) << "Data [32, 128) before ingest should be erased, should only get [0, 32)"; + EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) before ingest should be erased, should only get [0, 32)"; } { @@ -1359,7 +1359,7 @@ try /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, /* expected_block_size= */ 1024); - ASSERT_EQ(ins.size(), 1UL); + ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; size_t num_rows_read = 0; @@ -1367,7 +1367,7 @@ try while (Block block = in->read()) num_rows_read += block.rows(); in->readSuffix(); - EXPECT_EQ(num_rows_read, 32UL) << "The rows number after ingest is not match"; + EXPECT_EQ(num_rows_read, 32) << "The rows number after ingest is not match"; } } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 596471cba38..1c249720754 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1033,10 +1033,10 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface> genDMFile(DMContext & context, const Block & block) { - auto file_id = context.storage_pool.newDataPageId(); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - auto delegate = context.path_pool.getStableDiskDelegator(); - auto store_path = delegate.choosePath(); + auto store_path = delegator.choosePath(); DMFileBlockOutputStream::Flags flags; flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2); @@ -1044,11 +1044,11 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface(*tableColumns()), input_stream, file_id, store_path, flags); - delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path); + delegator.addDTFile(file_id, dmfile->getBytesOnDisk(), store_path); - auto & pk_column = block.getByPosition(0).column; - auto min_pk = pk_column->getInt(0); - auto max_pk = pk_column->getInt(block.rows() - 1); + auto & pk_column = block.getByPosition(0).column; + auto min_pk = pk_column->getInt(0); + auto max_pk = pk_column->getInt(block.rows() - 1); HandleRange range(min_pk, max_pk + 1); return {RowKeyRange::fromHandleRange(range), {file_id}}; @@ -1080,7 +1080,6 @@ try auto file_parent_path = delegate.getDTFilePath(file_id); auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path); auto pack = std::make_shared(dmContext(), file, range); - delegate.addDTFile(file_id, file->getBytesOnDisk(), file_parent_path); WriteBatches wbs(*storage_pool); wbs.data.putExternal(file_id, 0); wbs.writeLogAndData(); diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 3b991da0b6c..2d3f1b33bb1 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -339,25 +340,29 @@ String StableDiskDelegator::choosePath() const return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); } -String StableDiskDelegator::getDTFilePath(UInt64 file_id) const +String StableDiskDelegator::getDTFilePath(UInt64 file_id, bool throw_on_not_exist) const { std::lock_guard lock{pool.mutex}; auto iter = pool.dt_file_path_map.find(file_id); if (likely(iter != pool.dt_file_path_map.end())) return pool.main_path_infos[iter->second].path + "/" + StoragePathPool::STABLE_FOLDER_NAME; - throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]"); + if (likely(throw_on_not_exist)) + throw Exception("Can not find path for DMFile [id=" + toString(file_id) + "]"); + return ""; } void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::string_view path) { path.remove_suffix(1 + strlen(StoragePathPool::STABLE_FOLDER_NAME)); // remove '/stable' added in listPathsForStable/getDTFilePath std::lock_guard lock{pool.mutex}; - if (auto iter = pool.dt_file_path_map.find(file_id); iter != pool.dt_file_path_map.end()) + if (auto iter = pool.dt_file_path_map.find(file_id); unlikely(iter != pool.dt_file_path_map.end())) { - auto & path_info = pool.main_path_infos[iter->second]; - pool.dt_file_path_map.erase(iter); - path_info.file_size_map.erase(file_id); + const auto & path_info = pool.main_path_infos[iter->second]; + throw DB::TiFlashException( + fmt::format("Try to add a DTFile with duplicated id. [id={}] [path={}] [existed_path={}]", file_id, path, path_info.path), + Errors::DeltaTree::Internal); } + UInt32 index = UINT32_MAX; for (size_t i = 0; i < pool.main_path_infos.size(); i++) { @@ -368,7 +373,8 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin } } if (unlikely(index == UINT32_MAX)) - throw Exception("Unrecognized path " + String(path)); + throw DB::TiFlashException( + fmt::format("Try to add a DTFile to an unrecognized path. [id={}] [path={}]", file_id, path), Errors::DeltaTree::Internal); pool.dt_file_path_map.emplace(file_id, index); pool.main_path_infos[index].file_size_map.emplace(file_id, file_size); // update global used size diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 587a3b7dcc4..ab26ff48f45 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -96,7 +96,9 @@ class StableDiskDelegator : private boost::noncopyable String choosePath() const; - String getDTFilePath(UInt64 file_id) const; + // Get the path of the DTFile with file_id. + // If throw_on_not_exist is false, return empty string when the path is not exists. + String getDTFilePath(UInt64 file_id, bool throw_on_not_exist = true) const; void addDTFile(UInt64 file_id, size_t file_size, std::string_view path); diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index e2183cf76e5..f7ac629b7b1 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -1,6 +1,11 @@ #include #include +namespace DB::FailPoints +{ +extern const char force_set_dtfile_exist_when_acquire_id[]; +} // namespace DB::FailPoints + int main(int argc, char ** argv) { DB::tests::TiFlashTestEnv::setupLogger(); @@ -8,6 +13,8 @@ int main(int argc, char ** argv) #ifdef FIU_ENABLE fiu_init(0); // init failpoint + + DB::FailPointHelper::enableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id); #endif ::testing::InitGoogleTest(&argc, argv);