Skip to content

Commit

Permalink
PageStorage: Fine grained lock on external callbacks (#5699)
Browse files Browse the repository at this point in the history
close #5697
  • Loading branch information
JaySon-Huang authored Sep 5, 2022
1 parent f4a3ea0 commit 4752b4b
Show file tree
Hide file tree
Showing 22 changed files with 457 additions and 381 deletions.
36 changes: 16 additions & 20 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,15 @@ try
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
}

const String to_tbl_name = "t_112";
const String to_tbl_display_name = "tbl_test";
{
// Rename table
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db, to_tbl_name, db_name, to_tbl_name);
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db, tbl_name, db_name, to_tbl_display_name);

auto old_storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(old_storage, nullptr);

auto storage = db->tryGetTable(ctx, to_tbl_name);
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), to_tbl_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
Expand All @@ -294,13 +291,13 @@ try
// Drop table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = db_name;
drop_query->table = to_tbl_name;
drop_query->table = tbl_name;
drop_query->if_exists = false;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, ctx);
drop_interpreter.execute();

auto storage = db->tryGetTable(ctx, to_tbl_name);
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down Expand Up @@ -391,18 +388,18 @@ try
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
}

const String to_tbl_name = "t_112";
const String to_tbl_display_name = "tbl_test";
{
// Rename table
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name);
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, to_tbl_display_name);

auto old_storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(old_storage, nullptr);

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), to_tbl_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
EXPECT_EQ(managed_storage->getDatabaseName(), db2_name);
Expand All @@ -412,13 +409,13 @@ try
// Drop table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = db2_name;
drop_query->table = to_tbl_name;
drop_query->table = tbl_name;
drop_query->if_exists = false;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, ctx);
drop_interpreter.execute();

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down Expand Up @@ -501,18 +498,17 @@ try
EXPECT_FALSE(db->empty(ctx));
EXPECT_TRUE(db->isTableExist(ctx, tbl_name));

const String to_tbl_name = "t_112";
// Rename table to another database, and mock crash by failed point
FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed);
ASSERT_THROW(
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name),
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, tbl_name),
DB::Exception);

{
// After fail point triggled we should have both meta file in disk
Poco::File old_meta_file{db->getTableMetadataPath(tbl_name)};
ASSERT_TRUE(old_meta_file.exists());
Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name));
Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name));
ASSERT_TRUE(new_meta_file.exists());
// Old table should remain in db
auto old_storage = db->tryGetTable(ctx, tbl_name);
Expand All @@ -527,10 +523,10 @@ try
ThreadPool thread_pool(2);
db2->loadTables(ctx, &thread_pool, true);

Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name));
Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name));
ASSERT_FALSE(new_meta_file.exists());

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down
119 changes: 82 additions & 37 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Logger.h>
Expand All @@ -21,6 +22,7 @@
#include <Core/SortDescription.h>
#include <Functions/FunctionsConversion.h>
#include <Interpreters/sortBlock.h>
#include <Poco/Exception.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DMSegmentThreadInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
Expand All @@ -41,6 +43,7 @@

#include <atomic>
#include <ext/scope_guard.h>
#include <memory>

namespace ProfileEvents
{
Expand Down Expand Up @@ -196,7 +199,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
size_t rowkey_column_size_,
const Settings & settings_)
: global_context(db_context.getGlobalContext())
, path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))
, path_pool(std::make_shared<StoragePathPool>(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)))
, settings(settings_)
, db_name(db_name_)
, table_name(table_name_)
Expand All @@ -216,7 +219,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,

storage_pool = std::make_shared<StoragePool>(global_context,
ns_id,
path_pool,
*path_pool,
db_name_ + "." + table_name_);

// Restore existing dm files and set capacity for path_pool.
Expand Down Expand Up @@ -296,25 +299,46 @@ DeltaMergeStore::~DeltaMergeStore()

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` dropped, we must
// make the callbacks safe.
ExternalPageCallbacks callbacks;
// V2 callbacks for cleaning DTFiles
callbacks.scanner = [this]() {
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), file_provider = global_context.getFileProvider()]() {
ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec;
auto delegate = path_pool.getStableDiskDelegator();

// If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table,
// simply return an empty list is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
return path_and_ids_vec;

// Return the DTFiles on disks.
auto delegate = path_pool->getStableDiskDelegator();
// Only return the DTFiles can be GC. The page id of not able to be GC files, which is being ingested or in the middle of
// SegmentSplit/Merge/MergeDelta, is not yet applied
// to PageStorage is marked as not able to be GC, so we don't return them and run the `remover`
DMFile::ListOptions options;
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
auto & path_and_ids = path_and_ids_vec.emplace_back();
path_and_ids.first = root_path;
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options);
for (auto id : file_ids_in_current_path)
path_and_ids.second.insert(id);
std::set<PageId> ids_under_path;
auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options);
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
return path_and_ids_vec;
};
callbacks.remover = [this](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
auto delegate = path_pool.getStableDiskDelegator();
callbacks.remover = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), //
file_provider = global_context.getFileProvider(),
logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
// If the StoragePathPool is invalid, meaning we call `remover` after dropping the table,
// simply skip is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
return;

SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove");
auto delegate = path_pool->getStableDiskDelegator();
for (const auto & [path, ids] : path_and_ids_vec)
{
for (auto id : ids)
Expand All @@ -323,18 +347,50 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
continue;

// Note that page_id is useless here.
auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
if (dmfile->canGC())
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
if (unlikely(!dmfile))
{
delegate.removeDTFile(dmfile->fileId());
dmfile->remove(global_context.getFileProvider());
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
// called in another thread. Just try to clean if any id is left.
try
{
delegate.removeDTFile(id);
}
catch (DB::Exception & e)
{
// just ignore
}
LOG_FMT_INFO(logger,
"GC try remove useless DM file, but file not found and may have been removed, dmfile={}",
DMFile::getPathByStatus(path, id, DMFile::Status::READABLE));
}
else if (dmfile->canGC())
{
// StoragePathPool::drop may be called concurrently, ignore and continue next file if any exception thrown
String err_msg;
try
{
// scanner should only return dtfiles that can GC,
// just another check here.
delegate.removeDTFile(dmfile->fileId());
dmfile->remove(file_provider);
}
catch (DB::Exception & e)
{
err_msg = e.message();
}
catch (Poco::Exception & e)
{
err_msg = e.message();
}
if (err_msg.empty())
LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path());
else
LOG_FMT_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={}, err_msg={}", dmfile->path(), err_msg);
}

LOG_FMT_INFO(log, "GC removed useless dmfile: {}", dmfile->path());
}
}
};
callbacks.ns_id = storage_pool->getNamespaceId();
// remember to unregister it when shutdown
storage_pool->dataRegisterExternalPagesCallbacks(callbacks);
storage_pool->enableGC();
Expand All @@ -355,20 +411,9 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
blockable_background_pool_handle->wake();
}

void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name)
void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, String new_table_name)
{
if (clean_rename)
{
path_pool.rename(new_database_name, new_table_name, clean_rename);
}
else
{
LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name);

// Remove all background task first
shutdown();
path_pool.rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk
}
path_pool->rename(new_database_name, new_table_name);

// TODO: replacing these two variables is not atomic, but could be good enough?
table_name.swap(new_table_name);
Expand Down Expand Up @@ -465,7 +510,7 @@ void DeltaMergeStore::drop()
storage_pool->drop();

// Drop data in storage path pool
path_pool.drop(/*recursive=*/true, /*must_success=*/false);
path_pool->drop(/*recursive=*/true, /*must_success=*/false);
LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name);
}

Expand Down Expand Up @@ -496,7 +541,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
// Because db_context could be a temporary object and won't last long enough during the query process.
// Like the context created by InterpreterSelectWithUnionQuery.
auto * ctx = new DMContext(db_context.getGlobalContext(),
path_pool,
*path_pool,
*storage_pool,
latest_gc_safe_point.load(std::memory_order_acquire),
settings.not_compress_columns,
Expand Down Expand Up @@ -704,7 +749,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
if (shutdown_called.load(std::memory_order_relaxed))
return {};

auto delegator = path_pool.getStableDiskDelegator();
auto delegator = path_pool->getStableDiskDelegator();
auto parent_path = delegator.choosePath();
auto new_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
return {parent_path, new_id};
Expand All @@ -715,7 +760,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil
if (shutdown_called.load(std::memory_order_relaxed))
return;

auto delegator = path_pool.getStableDiskDelegator();
auto delegator = path_pool->getStableDiskDelegator();
delegator.addDTFile(file_id, file_size, parent_path);
}

Expand Down Expand Up @@ -2529,7 +2574,7 @@ void DeltaMergeStore::restoreStableFiles()
options.only_list_can_gc = false; // We need all files to restore the bytes on disk
options.clean_up = true;
auto file_provider = global_context.getFileProvider();
auto path_delegate = path_pool.getStableDiskDelegator();
auto path_delegate = path_pool->getStableDiskDelegator();
for (const auto & root_path : path_delegate.listPaths())
{
for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options))
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class DeltaMergeStore : private boost::noncopyable
return table_name;
}

void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name);
void rename(String new_path, String new_database_name, String new_table_name);

void clearData();

Expand Down Expand Up @@ -529,7 +529,7 @@ class DeltaMergeStore : private boost::noncopyable
#endif

Context & global_context;
StoragePathPool path_pool;
std::shared_ptr<StoragePathPool> path_pool;
Settings settings;
StoragePoolPtr storage_pool;

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ DMFilePtr DMFile::restore(
const ReadMetaMode & read_meta_mode)
{
String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
bool single_file_mode = Poco::File(path).isFile();
// The path may be dropped by another thread in some cases
auto poco_file = Poco::File(path);
if (!poco_file.exists())
return nullptr;

bool single_file_mode = poco_file.isFile();
DMFilePtr dmfile(new DMFile(
file_id,
page_id,
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ FileUsageStatistics GlobalStoragePool::getLogFileUsage() const

bool GlobalStoragePool::gc()
{
return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD);
return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD);
}

bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period)
Expand Down Expand Up @@ -445,7 +445,7 @@ PageStorageRunMode StoragePool::restore()
}
else
{
LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id);
LOG_FMT_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id);
}

if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0)
Expand All @@ -461,7 +461,7 @@ PageStorageRunMode StoragePool::restore()
}
else
{
LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}]", ns_id);
LOG_FMT_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id);
}

// Check number of valid pages in v2
Expand Down
Loading

0 comments on commit 4752b4b

Please sign in to comment.