Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

share PageStorage V3 inside tiflash instance #4220

Merged
merged 35 commits into from
Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
36d40ae
partial version
lidezhu Mar 8, 2022
2e40be5
fix conflict
lidezhu Mar 14, 2022
b074055
add GlobalStoragePool
lidezhu Mar 14, 2022
e2dc2f5
small fix
lidezhu Mar 14, 2022
81e6098
v3 for region persister
JaySon-Huang Mar 11, 2022
1220ad5
Merge branch 'support-share-page-storage' into support-share-page-sto…
JaySon-Huang Mar 14, 2022
b2d1815
Merge pull request #3 from JaySon-Huang/support-share-page-storage
lidezhu Mar 14, 2022
70e8290
try remove some duplicate code
lidezhu Mar 14, 2022
214a44e
small fix
lidezhu Mar 14, 2022
45876c4
small fix
lidezhu Mar 14, 2022
8a90011
Merge branch 'master' into support-share-page-storage
lidezhu Mar 14, 2022
48584a1
fix static analysis
lidezhu Mar 14, 2022
a400b73
change config name
lidezhu Mar 14, 2022
330807c
more change about config name
lidezhu Mar 14, 2022
8f7e891
ignore some stat in v3 mode
lidezhu Mar 15, 2022
4005c73
fix getMaxId
lidezhu Mar 15, 2022
426d3e3
Update dbms/src/Server/StorageConfigParser.cpp
lidezhu Mar 15, 2022
16c66de
handle the lifecycle of external page callbacks
lidezhu Mar 15, 2022
94b4035
Merge branch 'support-share-page-storage' of github.com:lidezhu/tics …
lidezhu Mar 15, 2022
48d3226
remove extra line
lidezhu Mar 15, 2022
046a37a
fix build
lidezhu Mar 15, 2022
3c30d07
add global single disk delegator
lidezhu Mar 16, 2022
54c6b82
fix build
lidezhu Mar 16, 2022
75f1531
add gtest for getMaxId
lidezhu Mar 16, 2022
f4ee341
improve external page callbacks
lidezhu Mar 16, 2022
8faec36
Merge branch 'master' into support-share-page-storage
lidezhu Mar 17, 2022
4ffbb2a
fix comment
lidezhu Mar 17, 2022
037fc39
Fix bug for running dtworkload with ps v3
JaySon-Huang Mar 9, 2022
2ca6ccc
Clean
JaySon-Huang Mar 17, 2022
80c7814
fix comment
lidezhu Mar 18, 2022
2a96538
Merge pull request #4 from JaySon-Huang/test_with_dt_stress
lidezhu Mar 18, 2022
ddc006a
Fix lint
JaySon-Huang Mar 18, 2022
90c94b9
Merge branch 'master' into support-share-page-storage
lidezhu Mar 18, 2022
112ae89
fix build
lidezhu Mar 18, 2022
f41e7f9
Merge branch 'master' into support-share-page-storage
lidezhu Mar 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -161,6 +163,7 @@ struct ContextShared
PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
DM::GlobalStoragePoolPtr global_storage_pool;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

class SessionKeyHash
Expand Down Expand Up @@ -1571,6 +1574,73 @@ ReadLimiterPtr Context::getReadLimiter() const
return getIORateLimiter().getReadLimiter();
}

static bool isUsingPageStorageV3(const PathPool & path_pool, bool enable_ps_v3)
{
// Check whether v3 is already enabled
for (const auto & path : path_pool.listGlobalPagePaths())
{
if (PS::V3::PageStorageImpl::isManifestsFileExists(path))
{
return true;
}
}

// Check whether v3 on new node is enabled in the config, if not, no need to check anymore
if (!enable_ps_v3)
return false;

// Check whether there are any files in kvstore path, if exists, then this is not a new node.
// If it's a new node, then we enable v3. Otherwise, we use v2.
for (const auto & path : path_pool.listKVStorePaths())
{
Poco::File dir(path);
if (!dir.exists())
continue;

std::vector<std::string> files;
dir.list(files);
if (!files.empty())
{
return false;
}
}
return true;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3)
{
auto lock = getLock();
if (isUsingPageStorageV3(path_pool, enable_ps_v3))
{
try
{
// create manifests file before initialize GlobalStoragePool
for (const auto & path : path_pool.listGlobalPagePaths())
PS::V3::PageStorageImpl::createManifestsFileIfNeed(path);

shared->global_storage_pool = std::make_shared<DM::GlobalStoragePool>(path_pool, *this, settings);
shared->global_storage_pool->restore();
return true;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
else
{
shared->global_storage_pool = nullptr;
return false;
}
}

DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
{
auto lock = getLock();
return shared->global_storage_pool;
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ namespace DM
{
class MinMaxIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
} // namespace DM

/// (database name, table name)
Expand Down Expand Up @@ -408,6 +410,10 @@ class Context
ReadLimiterPtr getReadLimiter() const;
IORateLimiter & getIORateLimiter() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3);

DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Compiler & getCompiler();

/// Call after initialization before using system logs. Call for global context.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Settings
M(SettingBool, group_by_collation_sensitive, false, "do group by with collation info.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minial chunk size of exchanging data among TiFlash.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minimal chunk size of exchanging data among TiFlash.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \
M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \
Expand Down Expand Up @@ -282,7 +282,7 @@ struct Settings
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.") \
M(SettingBool, dt_read_delta_only, false, "Only read delta data in DeltaTree Engine.") \
M(SettingBool, dt_read_stable_only, false, "Only read stable data in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_flush_after_write, false, "Flush cache or not after write in DeltaTree Engine.") \
M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.") \
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,11 @@ int benchEntry(const std::vector<std::string> & opts)
auto db_context = env.getContext();
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
raft_config.enable_compatible_mode, //
global_context->getPathCapacity(),
global_context->getFileProvider());
// must initialize before the following operation:
// 1. load data from disk(because this process may depend on the initialization of global StoragePool)
// 2. initialize KVStore service
// 1) because we need to check whether this is the first startup of this node, and we judge it based on whether there are any files in kvstore directory
// 2) KVStore service also choose its data format based on whether the GlobalStoragePool is initialized
if (global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool(), storage_config.enable_ps_v3))
LOG_FMT_INFO(log, "PageStorage V3 enabled.");

// Use pd address to define which default_database we use by default.
// For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash.
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, Poco::Logge
lazily_init_store = (*lazily_init != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store);
// config for experimental feature, may remove later
if (auto enable_v3 = table->get_qualified_as<Int32>("enable_ps_v3"); enable_v3)
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
enable_ps_v3 = (*enable_v3 != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {} enable_ps_v3 {}", format_version, lazily_init_store, enable_ps_v3);
}

Strings TiFlashStorageConfig::getAllNormalPaths() const
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct TiFlashStorageConfig

UInt64 format_version = 0;
bool lazily_init_store = true;
bool enable_ps_v3 = false;

public:
TiFlashStorageConfig() = default;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
}
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Blo

PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.page_id_generator.newLogPageId();
auto page_id = context.storage_pool.newLogPageId();

MemoryWriteBuffer write_buf;
PageFieldSizes col_data_sizes;
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class StoragePathPool;
namespace DM
{
class StoragePool;
class PageIdGenerator;
using NotCompress = std::unordered_set<ColId>;
struct DMContext;
using DMContextPtr = std::shared_ptr<DMContext>;
Expand All @@ -41,7 +40,6 @@ struct DMContext : private boost::noncopyable

StoragePathPool & path_pool;
StoragePool & storage_pool;
PageIdGenerator & page_id_generator;
const UInt64 hash_salt;

// gc safe-point, maybe update.
Expand Down Expand Up @@ -89,7 +87,6 @@ struct DMContext : private boost::noncopyable
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
StoragePool & storage_pool_,
PageIdGenerator & page_id_generator_,
const UInt64 hash_salt_,
const DB::Timestamp min_version_,
const NotCompress & not_compress_,
Expand All @@ -100,7 +97,6 @@ struct DMContext : private boost::noncopyable
: db_context(db_context_)
, path_pool(path_pool_)
, storage_pool(storage_pool_)
, page_id_generator(page_id_generator_)
, hash_salt(hash_salt_)
, min_version(min_version_)
, not_compress(not_compress_)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ ColumnFilePersisteds ColumnFilePersistedSet::checkHeadAndCloneTail(DMContext & c
else if (auto * t_file = column_file->tryToTinyFile(); t_file)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.page_id_generator.newLogPageId();
PageId new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t_file->getDataPageId());
auto new_column_file = t_file->cloneWith(new_data_page_id);
cloned_tail.push_back(new_column_file);
}
else if (auto * b_file = column_file->tryToBigFile(); b_file)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = b_file->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ struct DMContext;
struct WriteBatches;
class StoragePool;

class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
class DeltaValueSpace
: public std::enable_shared_from_this<DeltaValueSpace>
, private boost::noncopyable
{
public:
Expand Down Expand Up @@ -166,7 +167,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
// Other thread is doing structure update, just return.
if (!is_updating.compare_exchange_strong(v, true))
{
LOG_DEBUG(log, simpleInfo() << " Stop create snapshot because updating");
LOG_FMT_DEBUG(log, "{} Stop create snapshot because updating", simpleInfo());
return false;
}
return true;
Expand All @@ -177,7 +178,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
bool v = true;
if (!is_updating.compare_exchange_strong(v, false))
{
LOG_ERROR(log, "!!!=========================delta [" << getId() << "] is expected to be updating=========================!!!");
LOG_FMT_ERROR(log, "!!!=========================delta [ {}] is expected to be updating=========================!!!", getId());
return false;
}
else
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.page_id_generator.newLogPageId();
PageId new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);

Expand All @@ -101,7 +101,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * f = column_file->tryToBigFile(); f)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
Loading