Skip to content

Commit

Permalink
Merge branch 'release-6.1' into cherry-pick-4938-to-release-6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen authored May 24, 2022
2 parents 434cb8e + c9fd034 commit 9c23380
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 181 deletions.
13 changes: 11 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,19 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
!first_segment_entry.isValid())
{
// Create the first segment.
auto segment_id = storage_pool->newMetaPageId();
if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID)
throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR);
{
if (page_storage_run_mode == PageStorageRunMode::ONLY_V2)
{
throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR);
}

// In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore
// Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment
segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
}

auto first_segment
= Segment::newSegment(*dm_context, store_columns, RowKeyRange::newAll(is_common_handle, rowkey_column_size), segment_id, 0);
segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment);
Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,18 +378,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_v2->restore();
meta_storage_v2->restore();

max_log_page_id = log_storage_v2->getMaxId(ns_id);
max_data_page_id = data_storage_v2->getMaxId(ns_id);
max_meta_page_id = meta_storage_v2->getMaxId(ns_id);
max_log_page_id = log_storage_v2->getMaxId();
max_data_page_id = data_storage_v2->getMaxId();
max_meta_page_id = meta_storage_v2->getMaxId();

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only};
break;
}
case PageStorageRunMode::ONLY_V3:
{
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId();
max_data_page_id = data_storage_v3->getMaxId();
max_meta_page_id = meta_storage_v3->getMaxId();

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
break;
Expand Down Expand Up @@ -456,18 +456,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3);
meta_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3);

max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId();
max_data_page_id = data_storage_v3->getMaxId();
max_meta_page_id = meta_storage_v3->getMaxId();

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id));
max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id));
max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id));
max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId());
max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId());
max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId());
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,18 @@ class StoragePool : private boost::noncopyable
// Caller must cancel gc tasks before drop
void drop();

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);
// For function `newLogPageId`,`newMetaPageId`,`newDataPageIdForDTFile`:
// For PageStorageRunMode::ONLY_V2, every table have its own three PageStorage (meta/data/log).
// So these functions return the Page id starts from 1 and is continuously incremented.
// For PageStorageRunMode::ONLY_V3/MIX_MODE, PageStorage is global(distinguish by ns_id for different table).
// In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk),
// StoragePool will assign the max_log_page_id/max_meta_page_id/max_data_page_id by the global max id
// regardless of ns_id while being restored. This causes the ids in a table to not be continuously incremented.

PageId maxMetaPageId() { return max_meta_page_id; }
PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);
PageId newLogPageId() { return ++max_log_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,17 @@ class PageStorage : private boost::noncopyable

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;
// Get the max id from PageStorage.
//
// For V2, every table have its own three PageStorage (meta/data/log).
// So this function return the Page id starts from 0 and is continuously incremented to
// new pages.
// For V3, PageStorage is global(distinguish by ns_id for different table).
// In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk),
// this function returns the global max id regardless of ns_id. This causes the ids in a table
// to not be continuously incremented.
// Note that Page id 1 in each ns_id is special.
virtual PageId getMaxId() = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void PageStorage::restore()
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}

PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
PageId PageStorage::getMaxId()
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
Expand Down Expand Up @@ -893,9 +893,9 @@ void PageStorage::drop()
struct GcContext
{
PageFileIdAndLevel min_file_id;
PageFile::Type min_file_type;
PageFile::Type min_file_type = PageFile::Type::Invalid;
PageFileIdAndLevel max_file_id;
PageFile::Type max_file_type;
PageFile::Type max_file_type = PageFile::Type::Invalid;
size_t num_page_files = 0;
size_t num_legacy_files = 0;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class PageStorage : public DB::PageStorage

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;
PageId getMaxId() override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

Expand Down
49 changes: 7 additions & 42 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p
*************************/

PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 max_persisted_log_files_)
: sequence(0)
: max_page_id(0)
, sequence(0)
, wal(std::move(wal_))
, max_persisted_log_files(max_persisted_log_files_)
, log(Logger::get("PageDirectory", std::move(storage_name)))
Expand Down Expand Up @@ -923,49 +924,10 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const
}
}

PageId PageDirectory::getMaxId(NamespaceId ns_id) const
PageId PageDirectory::getMaxId() const
{
std::shared_lock read_lock(table_rw_mutex);
PageIdV3Internal upper_bound = buildV3Id(ns_id, UINT64_MAX);

auto iter = mvcc_table_directory.upper_bound(upper_bound);
if (iter == mvcc_table_directory.begin())
{
// The smallest page id is greater than the target page id or mvcc_table_directory is empty,
// and it means no page id is less than or equal to the target page id, return 0.
return 0;
}
else
{
// iter is not at the beginning and mvcc_table_directory is not empty,
// so iter-- must be a valid iterator, and it's the largest page id which is smaller than the target page id.
iter--;

do
{
// Can't find any entries in current ns_id
if (iter->first.high != ns_id)
{
break;
}

// Check and return whether this id is visible, otherwise continue to check the previous one.
if (iter->second->isVisible(UINT64_MAX - 1))
{
return iter->first.low;
}

// Current entry/ref/external is deleted and there are no entries before it.
if (iter == mvcc_table_directory.begin())
{
break;
}

iter--;
} while (true);

return 0;
}
return max_page_id;
}

std::set<PageIdV3Internal> PageDirectory::getAllPageIds()
Expand Down Expand Up @@ -1069,6 +1031,9 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
// stage 2, create entry version list for page_id.
for (const auto & r : edit.getRecords())
{
// Protected in write_lock
max_page_id = std::max(max_page_id, r.page_id.low);

auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr));
if (created)
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class PageDirectory
}
#endif

PageId getMaxId(NamespaceId ns_id) const;
PageId getMaxId() const;

std::set<PageIdV3Internal> getAllPageIds();

Expand Down Expand Up @@ -397,6 +397,7 @@ class PageDirectory
}

private:
PageId max_page_id;
std::atomic<UInt64> sequence;
mutable std::shared_mutex table_rw_mutex;
MVCCMapType mvcc_table_directory;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP
// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);

if (blob_stats)
{
Expand Down Expand Up @@ -111,7 +112,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
{
if (max_applied_ver < r.version)
max_applied_ver = r.version;
max_applied_page_id = std::max(r.page_id, max_applied_page_id);

applyRecord(dir, r);
}
Expand All @@ -127,6 +127,8 @@ void PageDirectoryFactory::applyRecord(
iter->second = std::make_shared<VersionedPageEntries>();
}

dir->max_page_id = std::max(dir->max_page_id, r.page_id.low);

const auto & version_list = iter->second;
const auto & restored_version = r.version;
try
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class PageDirectoryFactory
{
public:
PageVersion max_applied_ver;
PageIdV3Internal max_applied_page_id;

PageDirectoryFactory & setBlobStore(BlobStore & blob_store)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ void PageStorageImpl::restore()
.create(storage_name, file_provider, delegator, parseWALConfig(config));
}

PageId PageStorageImpl::getMaxId(NamespaceId ns_id)
PageId PageStorageImpl::getMaxId()
{
return page_directory->getMaxId(ns_id);
return page_directory->getMaxId();
}

void PageStorageImpl::drop()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PageStorageImpl : public DB::PageStorage

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;
PageId getMaxId() override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void deserializeFrom(ReadBuffer & buf, PageEntriesEdit & edit)
break;
}
default:
throw Exception(fmt::format("Unknown record type: {}", record_type));
throw Exception(fmt::format("Unknown record type: {}", record_type), ErrorCodes::LOGICAL_ERROR);
}
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ PageEntriesEdit deserializeFrom(std::string_view record)
UInt32 version = 0;
readIntBinary(version, buf);
if (version != 1)
throw Exception("");
throw Exception(fmt::format("Unknown version for PageEntriesEdit deser [version={}]", version), ErrorCodes::LOGICAL_ERROR);

deserializeFrom(buf, edit);
return edit;
Expand Down
Loading

0 comments on commit 9c23380

Please sign in to comment.