Skip to content

Commit

Permalink
storage: Accepts a snapshot for segment replaceData API (#6385)
Browse files Browse the repository at this point in the history
ref #5237
  • Loading branch information
breezewish authored Nov 29, 2022
1 parent 8e1c7d1 commit f57ec4b
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 293 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
M(DT_SnapshotOfDeltaMerge) \
M(DT_SnapshotOfDeltaCompact) \
M(DT_SnapshotOfPlaceIndex) \
M(DT_SnapshotOfSegmentIngest) \
M(IOLimiterPendingBgWriteReq) \
M(IOLimiterPendingFgWriteReq) \
M(IOLimiterPendingBgReadReq) \
Expand Down
34 changes: 31 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
}
}

// We have to commit those file_ids to PageStorage, because as soon as packs are written into segments,
// they are visible for readers who require file_ids to be found in PageStorage.
// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
// they are visible for readers immediately, who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

bool ingest_success = segment->ingestColumnFiles(*dm_context, range.shrink(segment_range), column_files, clear_data_in_range);
Expand Down Expand Up @@ -389,8 +389,36 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
* │----------- Segment ----------│
* │-------- Ingest Range --------│
*/
const auto new_segment_or_null = segmentDangerouslyReplaceData(dm_context, segment, file);
auto delegate = dm_context.path_pool.getStableDiskDelegator();
auto file_provider = dm_context.db_context.getFileProvider();

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

// Generate DMFile instance with a new ref_id pointed to the file_id,
// because we may use the same DMFile to ingest into multiple segments.
auto new_page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto ref_file = DMFile::restore(
file_provider,
file->fileId(),
new_page_id,
file->parentPath(),
DMFile::ReadMetaMode::all());
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
// they are visible for readers immediately, who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

const auto new_segment_or_null = segmentDangerouslyReplaceData(dm_context, segment, ref_file);
const bool succeeded = new_segment_or_null != nullptr;

if (!succeeded)
{
// When segment is not valid anymore, replaceData will fail.
// In this case, we just discard this ref file.
wbs.rollbackWrittenLogAndData();
}

return succeeded;
}
else if (is_start_matching)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData(
{
LOG_INFO(log, "ReplaceData - Begin, segment={} data_file={}", segment->info(), data_file->path());

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

SegmentPtr new_segment;
{
std::unique_lock lock(read_write_mutex);
Expand All @@ -500,23 +498,18 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData(
}

auto segment_lock = segment->mustGetUpdateLock();
new_segment = segment->dangerouslyReplaceData(segment_lock, dm_context, data_file, wbs);
new_segment = segment->replaceData(segment_lock, dm_context, data_file);

RUNTIME_CHECK(compare(segment->getRowKeyRange().getEnd(), new_segment->getRowKeyRange().getEnd()) == 0, segment->info(), new_segment->info());
RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info());

wbs.writeLogAndData();
wbs.writeMeta();

segment->abandon(dm_context);
segments[segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[segment->segmentId()] = new_segment;

LOG_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info());
}

wbs.writeRemoves();

if constexpr (DM_RUN_CHECK)
check(dm_context.db_context);

Expand Down
62 changes: 27 additions & 35 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,50 +873,40 @@ SegmentPtr Segment::applyMergeDelta(const Segment::Lock & lock, //
return new_me;
}

SegmentPtr Segment::dangerouslyReplaceDataForTest(DMContext & dm_context, //
const DMFilePtr & data_file) const
SegmentPtr Segment::replaceData(const Segment::Lock & lock, //
DMContext & context,
const DMFilePtr & data_file,
SegmentSnapshotPtr segment_snap_opt) const
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());

auto lock = mustGetUpdateLock();
auto new_segment = dangerouslyReplaceData(lock, dm_context, data_file, wbs);

wbs.writeAll();
return new_segment;
}
LOG_DEBUG(log, "ReplaceData - Begin, snapshot_rows={} data_file={}", segment_snap_opt == nullptr ? "<none>" : std::to_string(segment_snap_opt->getRows()), data_file->path());

SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, //
DMContext & dm_context,
const DMFilePtr & data_file,
WriteBatches & wbs) const
{
LOG_DEBUG(log, "ReplaceData - Begin, data_file={}", data_file->path());
ColumnFiles in_memory_files{};
ColumnFilePersisteds persisted_files{};

auto & storage_pool = dm_context.storage_pool;
auto delegate = dm_context.path_pool.getStableDiskDelegator();
WriteBatches wbs(context.storage_pool, context.getWriteLimiter());

RUNTIME_CHECK(delegate.getDTFilePath(data_file->fileId()) == data_file->parentPath());
// If a snapshot is specified, we retain newly written data since the snapshot.
// Otherwise, we just discard everything in the delta layer.
if (segment_snap_opt != nullptr)
{
std::tie(in_memory_files, persisted_files) = delta->cloneNewlyAppendedColumnFiles(
lock,
context,
rowkey_range,
*segment_snap_opt->delta,
wbs);
}

// Always create a ref to the file to allow `data_file` being shared.
auto new_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
// TODO: We could allow assigning multiple DMFiles in future.
auto ref_file = DMFile::restore(
dm_context.db_context.getFileProvider(),
data_file->fileId(),
new_page_id,
data_file->parentPath(),
DMFile::ReadMetaMode::all());
wbs.data.putRefPage(new_page_id, data_file->pageId());
auto new_delta = std::make_shared<DeltaValueSpace>(
delta->getId(),
persisted_files,
in_memory_files);
new_delta->saveMeta(wbs);

auto new_stable = std::make_shared<StableValueSpace>(stable->getId());
new_stable->setFiles({ref_file}, rowkey_range, &dm_context);
new_stable->setFiles({data_file}, rowkey_range, &context);
new_stable->saveMeta(wbs.meta);

// Empty new delta
auto new_delta = std::make_shared<DeltaValueSpace>(
delta->getId());
new_delta->saveMeta(wbs);

auto new_me = std::make_shared<Segment>( //
parent_log,
epoch + 1,
Expand All @@ -930,6 +920,8 @@ SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, //
delta->recordRemoveColumnFilesPages(wbs);
stable->recordRemovePacksPages(wbs);

wbs.writeAll();

LOG_DEBUG(log, "ReplaceData - Finish, old_me={} new_me={}", info(), new_me->info());

return new_me;
Expand Down
41 changes: 18 additions & 23 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,29 +355,20 @@ class Segment : private boost::noncopyable
const StableValueSpacePtr & new_stable) const;

/**
* Only used in tests as a shortcut.
* Normally you should use `dangerouslyReplaceData`.
*/
[[nodiscard]] SegmentPtr dangerouslyReplaceDataForTest(DMContext & dm_context, const DMFilePtr & data_file) const;

/**
* Discard all data in the current delta and stable layer, and use the specified DMFile as the stable instead.
* This API does not have a prepare & apply pair, as it should be quick enough. The specified DMFile is safe
* to be shared for multiple segments.
* Replace all data in the snapshot using the specified DMFile as the stable instead.
* Newly appended data since the snapshot was created will be retained the segment.
*
* Note 1: Should be protected behind the Segment update lock to ensure no new data will be appended to this
* segment during the function call. Otherwise these new data will be lost in the new segment.
* Snapshot is optional. If the snapshot is not specified, it means everything in the
* segment now will be replaced.
*
* Note 2: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same
* stable file for multiple segments. It is your own duty to enable GC later.
* This API does not have a prepare & apply pair, as it should be quick enough.
*
* Note 3: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written
* to the PageStorage's data. Otherwise there will be exceptions.
*
* Note 4: This API is subjected to be changed in future, as it relies on the knowledge that all current data
* in this segment is useless, which is a pretty tough requirement.
* Note 1: You must ensure the DMFile is not shared in multiple segments.
* Note 2: You must enable the GC for the DMFile by yourself.
* Note 3: You must ensure the DMFile has been managed by the storage pool, and has been written
* to the PageStorage's data.
*/
[[nodiscard]] SegmentPtr dangerouslyReplaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, WriteBatches & wbs) const;
[[nodiscard]] SegmentPtr replaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, SegmentSnapshotPtr segment_snap_opt = nullptr) const;

[[nodiscard]] SegmentPtr dropNextSegment(WriteBatches & wbs, const RowKeyRange & next_segment_range);

Expand Down Expand Up @@ -437,9 +428,13 @@ class Segment : private boost::noncopyable
return std::exchange(lock_opt, std::nullopt).value();
}

/// Marks this segment as abandoned.
/// Note: Segment member functions never abandon the segment itself.
/// The abandon state is usually triggered by the DeltaMergeStore.
/**
* Marks this segment as abandoned.
* Note: Segment member functions never abandon the segment itself.
* The abandon state is usually triggered by the DeltaMergeStore.
* When triggering, remember to hold a unique_lock from the DeltaMergeStore.
* Otherwise, the abandon operation may break an existing segment update operation.
*/
void abandon(DMContext & context)
{
LOG_DEBUG(log, "Abandon segment, segment={}", simpleInfo());
Expand Down Expand Up @@ -532,7 +527,7 @@ class Segment : private boost::noncopyable
bool relevant_place) const;

private:
/// The version of this segment. After split / merge / mergeDelta / dangerouslyReplaceData, epoch got increased by 1.
/// The version of this segment. After split / merge / mergeDelta / replaceData, epoch got increased by 1.
const UInt64 epoch;

RowKeyRange rowkey_range;
Expand Down
Loading

0 comments on commit f57ec4b

Please sign in to comment.