Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: handle IngestSST using segment split #6378

Merged
merged 26 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
97e1f08
Pick impl from older branch
breezewish Nov 28, 2022
678f40f
Check external files are not overlapped in ingestFiles
breezewish Nov 28, 2022
39d2917
Fix tests
breezewish Nov 28, 2022
1b97371
storage: Add more checks for ingest
breezewish Nov 28, 2022
09aee07
Merge remote-tracking branch 'origin/master' into wenxuan/ingest_chec…
breezewish Nov 28, 2022
a91ded3
Reformat
breezewish Nov 28, 2022
9a3e9d4
Revert the segment abandon change
breezewish Nov 28, 2022
52b7427
Merge remote-tracking branch 'origin/master' into wenxuan/ingest-by-s…
breezewish Nov 28, 2022
85b6ac2
Merge commit 'a91ded340ded7a19629e355e43b0d1784b62bbf7' into wenxuan/…
breezewish Nov 28, 2022
8514195
Address comments
breezewish Nov 29, 2022
cab390b
Cherry pick upstream changes
breezewish Nov 29, 2022
1eac9ae
Fix clang-tidy lints
breezewish Nov 29, 2022
640b397
Merge branch 'master' into wenxuan/ingest_check_not_overlap
ti-chi-bot Nov 29, 2022
525db1e
storage: Accepts a snapshot for segment replaceData API
breezewish Nov 29, 2022
52cf4be
Merge branch 'master' into wenxuan/replace_data_snapshot
breezewish Nov 29, 2022
5648b3c
Merge commit '640b3974c6aa6880d4a65dcfd9b75714b9408ca8' into wenxuan/…
breezewish Nov 29, 2022
edd09e4
Fix typos
breezewish Nov 29, 2022
37c7eba
Merge commit '52cf4be99306a6b4aef4e46a2fdae057bdfb499f' into wenxuan/…
breezewish Nov 29, 2022
2e7122d
Merge commit 'f57ec4be15f1eb3a5e459fe5850b2a1b6ce678d9' into wenxuan/…
breezewish Nov 30, 2022
64b1e94
Fix merge issue
breezewish Nov 30, 2022
49064f4
Add concurrent write & ingest test
breezewish Nov 30, 2022
1a1f7eb
Add debug logs
breezewish Nov 30, 2022
f093c49
Address comments
breezewish Dec 1, 2022
d631a73
Unify the name of splitByIngest and splitByReplace (they are the same)
breezewish Dec 1, 2022
e4b5c02
storage: Simplify segmentIngestData
breezewish Dec 1, 2022
742f389
Merge branch 'master' into wenxuan/ingest-by-split-2
flowbehappy Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
M(DT_SnapshotOfReadRaw) \
M(DT_SnapshotOfSegmentSplit) \
M(DT_SnapshotOfSegmentMerge) \
M(DT_SnapshotOfSegmentIngest) \
M(DT_SnapshotOfDeltaMerge) \
M(DT_SnapshotOfDeltaCompact) \
M(DT_SnapshotOfPlaceIndex) \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_context_path) \
M(force_slow_page_storage_snapshot_release) \
M(force_change_all_blobs_to_read_only) \
M(force_ingest_via_delta) \
M(force_ingest_via_split) \
M(unblock_query_init_after_write) \
M(exception_in_merged_task_init) \
M(force_fail_in_flush_region_data)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
M(DMCleanReadRows) \
M(DMSegmentIsEmptyFastPath) \
M(DMSegmentIsEmptySlowPath) \
M(DMSegmentIngestDataByReplace) \
M(DMSegmentIngestDataIntoDelta) \
\
M(FileFSync) \
\
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DeltaValueSpace
, private boost::noncopyable
{
public:
using Lock = std::unique_lock<std::mutex>;
using Lock = std::unique_lock<std::recursive_mutex>;

private:
/// column files in `persisted_file_set` are all persisted in disks and can be restored after restart.
Expand Down Expand Up @@ -94,7 +94,8 @@ class DeltaValueSpace
DeltaIndexPtr delta_index;

// Protects the operations in this instance.
mutable std::mutex mutex;
// It is a recursive_mutex because the lock may be also used by the parent segment as its update lock.
mutable std::recursive_mutex mutex;
Copy link
Member Author

@breezewish breezewish Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This recursive mutex happens when:

  1. DeltaMergeStore::segmentIngestData grabs a segment update lock
  2. DeltaMergeStore::segmentIngestData discovered the segment is not empty and then ingest files into the delta.
    • DeltaValueSpace::ingestColumnFiles grabs the same lock to write to delta VS.

The real problem is we do not distinguish the segment update lock with the delta lock. When both lock is needed, they are conflicted. Also, I did not changed the ingestColumnFiles API to allow passing an external lock, because other APIs in the DeltaVS do not have this pattern and I think it may not be a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, in DeltaMergeStore::segmentApplyXXX we will acquire the segment update lock and won't change the DeltaVS inside the "apply".
segmentIngestData brings a special pattern that may change the DeltaVS inside its "apply". But I didn't figure out a better solution


LoggerPtr log;

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ void MemTableSet::appendDeleteRange(const RowKeyRange & delete_range)

void MemTableSet::ingestColumnFiles(const RowKeyRange & range, const ColumnFiles & new_column_files, bool clear_data_in_range)
{
for (const auto & f : new_column_files)
RUNTIME_CHECK(f->isBigFile());

// Prepend a DeleteRange to clean data before applying column files
if (clear_data_in_range)
{
Expand All @@ -243,6 +246,7 @@ void MemTableSet::ingestColumnFiles(const RowKeyRange & range, const ColumnFiles

for (const auto & f : new_column_files)
{
RUNTIME_CHECK(f->isBigFile());
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
appendColumnFileInner(f);
}
}
Expand Down
36 changes: 21 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,15 @@ class DeltaMergeStore : private boost::noncopyable

void preIngestFile(const String & parent_path, PageId file_id, size_t file_size);

/// You must ensure external files are ordered and do not overlap. Otherwise exceptions will be thrown.
/// You must ensure all of the external files are contained by the range. Otherwise exceptions will be thrown.
void ingestFiles(const DMContextPtr & dm_context, //
const RowKeyRange & range,
const std::vector<DM::ExternalDTFileInfo> & external_files,
bool clear_data_in_range);

/// You must ensure external files are ordered and do not overlap. Otherwise exceptions will be thrown.
/// You must ensure all of the external files are contained by the range. Otherwise exceptions will be thrown.
void ingestFiles(const Context & db_context, //
const DB::Settings & db_settings,
const RowKeyRange & range,
Expand Down Expand Up @@ -504,29 +508,30 @@ class DeltaMergeStore : private boost::noncopyable
SegmentSnapshotPtr segment_snap = nullptr);

/**
* Discard all data in the segment, and use the specified DMFile as the stable instead.
* The specified DMFile is safe to be shared for multiple segments.
* Ingest a DMFile into the segment, optionally causing a new segment being created.
*
* Note 1: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same
* stable file for multiple segments. It is your own duty to enable GC later.
*
* Note 2: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written
* to the PageStorage's data. Otherwise there will be exceptions.
*
* Note 3: This API is subjected to be changed in future, as it relies on the knowledge that all current data
* in this segment is useless, which is a pretty tough requirement.
* Note 1: You must ensure the DMFile is not shared in multiple segments.
* Note 2: You must enable the GC for the DMFile by yourself.
* Note 3: You must ensure the DMFile has been managed by the storage pool, and has been written
* to the PageStorage's data.

* @param clear_all_data_in_segment Whether all data in the segment should be discarded.
* @returns the new segment. The returned segment may be nullptr. In this case, it IS NOT A FAILURE.
* It means the ingest does not cause a segment created and the current segment
* is still valid.
*/
SegmentPtr segmentDangerouslyReplaceData(
SegmentPtr segmentIngestData(
DMContext & dm_context,
const SegmentPtr & segment,
const DMFilePtr & data_file);
const DMFilePtr & data_file,
bool clear_all_data_in_segment);

// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(const std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
bool isSegmentValid(const std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
inline bool isSegmentValid(const std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
bool isSegmentValid(const std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
return doIsSegmentValid(segment);
}
Expand All @@ -553,7 +558,8 @@ class DeltaMergeStore : private boost::noncopyable
DMContext & dm_context,
const SegmentPtr & segment,
const RowKeyRange & ingest_range,
const DMFilePtr & file);
const DMFilePtr & file,
bool clear_data_in_range);

bool updateGCSafePoint();

Expand Down
150 changes: 115 additions & 35 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Segment.h>

#include <magic_enum.hpp>

namespace CurrentMetrics
{
extern const Metric DT_SnapshotOfSegmentIngest;
}

namespace ProfileEvents
{
extern const Event DMWriteFile;
Expand All @@ -34,6 +41,8 @@ namespace FailPoints
extern const char pause_when_ingesting_to_dt_store[];
extern const char force_set_segment_ingest_packs_fail[];
extern const char segment_merge_after_ingest_packs[];
extern const char force_ingest_via_delta[];
extern const char force_ingest_via_split[];
} // namespace FailPoints

namespace DM
Expand Down Expand Up @@ -94,10 +103,14 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(

segment_range = segment->getRowKeyRange();

// Write could fail, because other threads could already updated the instance. Like split/merge, merge delta.
ColumnFiles column_files;
// Write could fail, because we do not lock the segment here.
// Thus, other threads may update the instance at any time, like split, merge, merge delta,
// causing the segment to be abandoned.
WriteBatches wbs(*storage_pool, dm_context->getWriteLimiter());

DMFiles data_files;
data_files.reserve(files.size());

for (const auto & file : files)
{
/// Generate DMFile instance with a new ref_id pointed to the file_id.
Expand All @@ -106,19 +119,15 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all());
auto column_file = std::make_shared<ColumnFileBig>(*dm_context, ref_file, segment_range);
if (column_file->getRows() != 0)
{
column_files.emplace_back(std::move(column_file));
wbs.data.putRefPage(page_id, file->pageId());
}
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
}

// We have to commit those file_ids to PageStorage, because as soon as packs are written into segments,
// they are visible for readers who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

bool ingest_success = segment->ingestColumnFiles(*dm_context, range.shrink(segment_range), column_files, clear_data_in_range);
bool ingest_success = segment->ingestDataToDelta(*dm_context, range.shrink(segment_range), data_files, clear_data_in_range);
fiu_do_on(FailPoints::force_set_segment_ingest_packs_fail, { ingest_success = false; });
if (ingest_success)
{
Expand All @@ -145,12 +154,10 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(

/**
* Accept a target ingest range and a vector of DTFiles, ingest these DTFiles (clipped by the target ingest range)
* using logical split. All data in the target ingest range will be cleared, and replaced by the specified DTFiles.
* using logical split.
*
* `clear_data_in_range` must be true. Otherwise exceptions will be thrown.
*
* You must ensure DTFiles do not overlap. Otherwise you will lose data.
* // TODO: How to check this? Maybe we can enforce external_files to be ordered.
* You must ensure DTFiles do not overlap. Otherwise this function will not work properly when clear_data_in_range == true.
* The check is performed in `ingestFiles`.
*
* WARNING: This function does not guarantee isolation. You may observe partial results when
* querying related segments when this function is running.
Expand All @@ -162,20 +169,23 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
const DMFiles & files,
bool clear_data_in_range)
{
RUNTIME_CHECK(clear_data_in_range == true);
RUNTIME_CHECK(
files.size() == external_files.size(),
files.size(),
external_files.size());
for (size_t i = 0; i < files.size(); ++i)
{
RUNTIME_CHECK(files[i]->pageId() == external_files[i].id);
RUNTIME_CHECK(
files.size() == external_files.size(),
files.size(),
external_files.size());
for (size_t i = 0; i < files.size(); ++i)
RUNTIME_CHECK(
files[i]->pageId() == external_files[i].id,
files[i]->pageId(),
external_files[i].toString());
}

std::set<SegmentPtr> updated_segments;

// First phase (DeleteRange Phase):
// Write DeleteRange to the covered segments to ensure that all data in the `ingest_range` is cleared.
if (clear_data_in_range)
{
RowKeyRange remaining_delete_range = ingest_range;
LOG_INFO(
Expand Down Expand Up @@ -220,16 +230,16 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
// segment may be abandoned, retry current range by finding the segment again.
}
}
}

LOG_DEBUG(
log,
"Table ingest using split - delete range phase - finished, updated_segments_n={}",
updated_segments.size());
LOG_DEBUG(
log,
"Table ingest using split - delete range phase - finished, updated_segments_n={}",
updated_segments.size());
}

/*
* In second phase (SplitIngest Phase),
* we will try to ingest DMFile one by one into the segments in order.
* we will try to ingest DMFile one by one into the segments.
*
* Consider the following case:
* -Inf +Inf
Expand Down Expand Up @@ -323,7 +333,7 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
segment->simpleInfo(),
segment_ingest_range.toDebugString());

const bool succeeded = ingestDTFileIntoSegmentUsingSplit(*dm_context, segment, segment_ingest_range, files[file_idx]);
const bool succeeded = ingestDTFileIntoSegmentUsingSplit(*dm_context, segment, segment_ingest_range, files[file_idx], clear_data_in_range);
if (succeeded)
{
updated_segments.insert(segment);
Expand Down Expand Up @@ -356,7 +366,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
DMContext & dm_context,
const SegmentPtr & segment,
const RowKeyRange & ingest_range,
const DMFilePtr & file)
const DMFilePtr & file,
bool clear_data_in_range)
{
const auto & segment_range = segment->getRowKeyRange();

Expand All @@ -379,16 +390,47 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
if (is_start_matching && is_end_matching)
{
/*
* The segment and the ingest range is perfectly matched. We can
* simply replace all of the data from this segment.
* The segment and the ingest range is perfectly matched.
*
* Example:
* │----------- Segment ----------│
* │-------- Ingest Range --------│
*/
const auto new_segment_or_null = segmentDangerouslyReplaceData(dm_context, segment, file);
const bool succeeded = new_segment_or_null != nullptr;
return succeeded;

auto delegate = dm_context.path_pool.getStableDiskDelegator();
auto file_provider = dm_context.db_context.getFileProvider();

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

// Generate DMFile instance with a new ref_id pointed to the file_id,
// because we may use the same DMFile to ingest into multiple segments.
auto new_page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto ref_file = DMFile::restore(
file_provider,
file->fileId(),
new_page_id,
file->parentPath(),
DMFile::ReadMetaMode::all());
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
// they are visible for readers immediately, who require file_ids to be found in PageStorage.
wbs.writeLogAndData();

// clear_all_data_in_segment == clear_data_in_range is safe, because we have verified
// that current segment range is identical to the ingest range.
auto new_segment = segmentIngestData(dm_context, segment, ref_file, clear_data_in_range);
if (new_segment == nullptr)
{
// When ingest failed, just discard this ref file.
wbs.rollbackWrittenLogAndData();

// Failed, segment may be abandoned, or there is another update in progress.
std::this_thread::sleep_for(std::chrono::milliseconds(15));
return false;
}

return true;
}
else if (is_start_matching)
{
Expand Down Expand Up @@ -463,6 +505,41 @@ void DeltaMergeStore::ingestFiles(
throw Exception(msg);
}

{
// `ingestDTFilesUsingSplit` requires external_files to be not overlapped. Otherwise the results will be incorrect.
// Here we verify the external_files are ordered and not overlapped.
// "Ordered" is actually not a hard requirement by `ingestDTFilesUsingSplit`. However "ordered" makes us easy to check overlap efficiently.
RowKeyValue last_end;
if (is_common_handle)
last_end = RowKeyValue::COMMON_HANDLE_MIN_KEY;
else
last_end = RowKeyValue::INT_HANDLE_MIN_KEY;

// Suppose we have keys: 1, 2, | 3, 4, 5, | 6, | 7, 8
// Our file ranges will be: [1, 3), [3, 6), [6, 7), [7, 9)
// ↑ ↑
// A B
// We require A <= B.
for (const auto & ext_file : external_files)
{
RUNTIME_CHECK(
!ext_file.range.none(),
ext_file.toString());
RUNTIME_CHECK(
compare(last_end.toRowKeyValueRef(), ext_file.range.getStart()) <= 0,
last_end.toDebugString(),
ext_file.toString());
last_end = ext_file.range.end;
}

// Check whether all external files are contained by the range.
for (const auto & ext_file : external_files)
{
RUNTIME_CHECK(compare(range.getStart(), ext_file.range.getStart()) <= 0);
RUNTIME_CHECK(compare(range.getEnd(), ext_file.range.getEnd()) >= 0);
}
}

EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS);

auto delegate = dm_context->path_pool.getStableDiskDelegator();
Expand Down Expand Up @@ -491,12 +568,15 @@ void DeltaMergeStore::ingestFiles(
}

bool ingest_using_split = false;
if (clear_data_in_range && bytes >= dm_context->delta_small_column_file_bytes)
if (bytes >= dm_context->delta_small_column_file_bytes)
{
// We still write small ssts directly into the delta layer.
ingest_using_split = true;
}

fiu_do_on(FailPoints::force_ingest_via_delta, { ingest_using_split = false; });
fiu_do_on(FailPoints::force_ingest_via_split, { ingest_using_split = true; });

{
auto get_ingest_files = [&] {
FmtBuffer fmt_buf;
Expand Down
Loading