Skip to content

Commit

Permalink
share PageStorage V3 inside tiflash instance (#4220)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
lidezhu authored Mar 19, 2022
1 parent 358d528 commit 5f163e6
Show file tree
Hide file tree
Showing 47 changed files with 847 additions and 292 deletions.
70 changes: 70 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -161,6 +163,7 @@ struct ContextShared
PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics
FileProviderPtr file_provider; /// File provider.
IORateLimiter io_rate_limiter;
DM::GlobalStoragePoolPtr global_storage_pool;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

class SessionKeyHash
Expand Down Expand Up @@ -1571,6 +1574,73 @@ ReadLimiterPtr Context::getReadLimiter() const
return getIORateLimiter().getReadLimiter();
}

static bool isUsingPageStorageV3(const PathPool & path_pool, bool enable_ps_v3)
{
// Check whether v3 is already enabled
for (const auto & path : path_pool.listGlobalPagePaths())
{
if (PS::V3::PageStorageImpl::isManifestsFileExists(path))
{
return true;
}
}

// Check whether v3 on new node is enabled in the config, if not, no need to check anymore
if (!enable_ps_v3)
return false;

// Check whether there are any files in kvstore path, if exists, then this is not a new node.
// If it's a new node, then we enable v3. Otherwise, we use v2.
for (const auto & path : path_pool.listKVStorePaths())
{
Poco::File dir(path);
if (!dir.exists())
continue;

std::vector<std::string> files;
dir.list(files);
if (!files.empty())
{
return false;
}
}
return true;
}

bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3)
{
auto lock = getLock();
if (isUsingPageStorageV3(path_pool, enable_ps_v3))
{
try
{
// create manifests file before initialize GlobalStoragePool
for (const auto & path : path_pool.listGlobalPagePaths())
PS::V3::PageStorageImpl::createManifestsFileIfNeed(path);

shared->global_storage_pool = std::make_shared<DM::GlobalStoragePool>(path_pool, *this, settings);
shared->global_storage_pool->restore();
return true;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
else
{
shared->global_storage_pool = nullptr;
return false;
}
}

DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
{
auto lock = getLock();
return shared->global_storage_pool;
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ namespace DM
{
class MinMaxIndexCache;
class DeltaIndexManager;
class GlobalStoragePool;
using GlobalStoragePoolPtr = std::shared_ptr<GlobalStoragePool>;
} // namespace DM

/// (database name, table name)
Expand Down Expand Up @@ -408,6 +410,10 @@ class Context
ReadLimiterPtr getReadLimiter() const;
IORateLimiter & getIORateLimiter() const;

bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3);

DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

Compiler & getCompiler();

/// Call after initialization before using system logs. Call for global context.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Settings
M(SettingBool, group_by_collation_sensitive, false, "do group by with collation info.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minial chunk size of exchanging data among TiFlash.") \
M(SettingInt64, batch_send_min_limit, DEFAULT_BATCH_SEND_MIN_LIMIT, "default minimal chunk size of exchanging data among TiFlash.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \
M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \
Expand Down Expand Up @@ -282,7 +282,7 @@ struct Settings
M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.") \
M(SettingBool, dt_read_delta_only, false, "Only read delta data in DeltaTree Engine.") \
M(SettingBool, dt_read_stable_only, false, "Only read stable data in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_logical_split, false, "Enable logical split or not in DeltaTree Engine.") \
M(SettingBool, dt_flush_after_write, false, "Flush cache or not after write in DeltaTree Engine.") \
M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.") \
M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.") \
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,11 @@ int benchEntry(const std::vector<std::string> & opts)
auto db_context = env.getContext();
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
raft_config.enable_compatible_mode, //
global_context->getPathCapacity(),
global_context->getFileProvider());
// must initialize before the following operation:
// 1. load data from disk(because this process may depend on the initialization of global StoragePool)
// 2. initialize KVStore service
// 1) because we need to check whether this is the first startup of this node, and we judge it based on whether there are any files in kvstore directory
// 2) KVStore service also choose its data format based on whether the GlobalStoragePool is initialized
if (global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool(), storage_config.enable_ps_v3))
LOG_FMT_INFO(log, "PageStorage V3 enabled.");

// Use pd address to define which default_database we use by default.
// For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash.
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, Poco::Logge
lazily_init_store = (*lazily_init != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store);
// config for experimental feature, may remove later
if (auto enable_v3 = table->get_qualified_as<Int32>("enable_ps_v3"); enable_v3)
{
enable_ps_v3 = (*enable_v3 != 0);
}

LOG_FMT_INFO(log, "format_version {} lazily_init_store {} enable_ps_v3 {}", format_version, lazily_init_store, enable_ps_v3);
}

Strings TiFlashStorageConfig::getAllNormalPaths() const
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct TiFlashStorageConfig

UInt64 format_version = 0;
bool lazily_init_store = true;
bool enable_ps_v3 = false;

public:
TiFlashStorageConfig() = default;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
}
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Blo

PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.page_id_generator.newLogPageId();
auto page_id = context.storage_pool.newLogPageId();

MemoryWriteBuffer write_buf;
PageFieldSizes col_data_sizes;
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class StoragePathPool;
namespace DM
{
class StoragePool;
class PageIdGenerator;
using NotCompress = std::unordered_set<ColId>;
struct DMContext;
using DMContextPtr = std::shared_ptr<DMContext>;
Expand All @@ -41,7 +40,6 @@ struct DMContext : private boost::noncopyable

StoragePathPool & path_pool;
StoragePool & storage_pool;
PageIdGenerator & page_id_generator;
const UInt64 hash_salt;

// gc safe-point, maybe update.
Expand Down Expand Up @@ -89,7 +87,6 @@ struct DMContext : private boost::noncopyable
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
StoragePool & storage_pool_,
PageIdGenerator & page_id_generator_,
const UInt64 hash_salt_,
const DB::Timestamp min_version_,
const NotCompress & not_compress_,
Expand All @@ -100,7 +97,6 @@ struct DMContext : private boost::noncopyable
: db_context(db_context_)
, path_pool(path_pool_)
, storage_pool(storage_pool_)
, page_id_generator(page_id_generator_)
, hash_salt(hash_salt_)
, min_version(min_version_)
, not_compress(not_compress_)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ ColumnFilePersisteds ColumnFilePersistedSet::checkHeadAndCloneTail(DMContext & c
else if (auto * t_file = column_file->tryToTinyFile(); t_file)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.page_id_generator.newLogPageId();
PageId new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t_file->getDataPageId());
auto new_column_file = t_file->cloneWith(new_data_page_id);
cloned_tail.push_back(new_column_file);
}
else if (auto * b_file = column_file->tryToBigFile(); b_file)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = b_file->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ struct DMContext;
struct WriteBatches;
class StoragePool;

class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
class DeltaValueSpace
: public std::enable_shared_from_this<DeltaValueSpace>
, private boost::noncopyable
{
public:
Expand Down Expand Up @@ -166,7 +167,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
// Other thread is doing structure update, just return.
if (!is_updating.compare_exchange_strong(v, true))
{
LOG_DEBUG(log, simpleInfo() << " Stop create snapshot because updating");
LOG_FMT_DEBUG(log, "{} Stop create snapshot because updating", simpleInfo());
return false;
}
return true;
Expand All @@ -177,7 +178,7 @@ class DeltaValueSpace : public std::enable_shared_from_this<DeltaValueSpace>
bool v = true;
if (!is_updating.compare_exchange_strong(v, false))
{
LOG_ERROR(log, "!!!=========================delta [" << getId() << "] is expected to be updating=========================!!!");
LOG_FMT_ERROR(log, "!!!=========================delta [ {}] is expected to be updating=========================!!!", getId());
return false;
}
else
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.page_id_generator.newLogPageId();
PageId new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);

Expand All @@ -101,7 +101,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * f = column_file->tryToBigFile(); f)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
Loading

0 comments on commit 5f163e6

Please sign in to comment.