Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fix unexpected dmfile removed after shutdown (#6558) #6603

Merged
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ struct ContextShared
return;
shutdown_called = true;

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
// shutting down the tables.
global_storage_pool->shutdown();
}

/** At this point, some tables may have threads that block our mutex.
* To complete them correctly, we will copy the current list of tables,
* and ask them all to finish their work.
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,11 @@ void DeltaMergeStore::shutdown()
return;

LOG_TRACE(log, "Shutdown DeltaMerge start");
// shutdown before unregister to avoid conflict between this thread and background gc thread on the `ExternalPagesCallbacks`
// because PageStorage V2 doesn't have any lock protection on the `ExternalPagesCallbacks`.(The order doesn't matter for V3)
// Must shutdown storage path pool to make sure the DMFile remove callbacks
// won't remove dmfiles unexpectly.
path_pool->shutdown();
// shutdown storage pool and clean up the local DMFile remove callbacks
storage_pool->shutdown();
storage_pool->dataUnregisterExternalPagesCallbacks(storage_pool->getNamespaceId());

background_pool.removeTask(background_task_handle);
blockable_background_pool.removeTask(blockable_background_pool_handle);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ using SegmentIdSet = std::unordered_set<UInt64>;
struct ExternalDTFileInfo;
struct GCOptions;

namespace tests
{
class DeltaMergeStoreTest;
}

inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1;

struct SegmentStats
Expand Down Expand Up @@ -155,6 +160,7 @@ struct StoreStats
class DeltaMergeStore : private boost::noncopyable
{
public:
friend class ::DB::DM::tests::DeltaMergeStoreTest;
struct Settings
{
NotCompress not_compress_columns{};
Expand Down
89 changes: 69 additions & 20 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Encryption/FileProvider.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/GCOptions.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/TMTContext.h>

#include <magic_enum.hpp>
#include <memory>

namespace CurrentMetrics
{
Expand All @@ -36,20 +39,33 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{
void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)

// A callback class for scanning the DMFiles on local filesystem
class LocalDMFileGcScanner final
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` dropped, we must
// make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), file_provider = global_context.getFileProvider()]() {
private:
// !!! Warning !!!
// Should only keep a weak ref of storage path pool since
// this callback instance may still valid inside the PageStorage
// even after the DeltaMerge storage is shutdown or released.
std::weak_ptr<StoragePathPool> path_pool_weak_ref;
FileProviderPtr file_provider;

public:
LocalDMFileGcScanner(std::weak_ptr<StoragePathPool> path_pool_, FileProviderPtr provider)
: path_pool_weak_ref(std::move(path_pool_))
, file_provider(std::move(provider))
{}

ExternalPageCallbacks::PathAndIdsVec operator()()
{
ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec;

// If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table,
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `scanner` after shutdowning or dropping the table,
// simply return an empty list is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
if (!path_pool || path_pool->isShutdown())
return path_and_ids_vec;

// Return the DTFiles on disks.
Expand All @@ -66,14 +82,35 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
return path_and_ids_vec;
};
callbacks.remover = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), //
file_provider = global_context.getFileProvider(),
logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
// If the StoragePathPool is invalid, meaning we call `remover` after dropping the table,
// simply skip is OK.
}
};

// A callback class for removing the DMFiles on local filesystem
class LocalDMFileGcRemover final
{
private:
// !!! Warning !!!
// Should only keep a weak ref of storage path pool since
// this callback instance may still valid inside the PageStorage
// even after the DeltaMerge storage is shutdown or released.
std::weak_ptr<StoragePathPool> path_pool_weak_ref;
FileProviderPtr file_provider;
LoggerPtr logger;

public:
LocalDMFileGcRemover(std::weak_ptr<StoragePathPool> path_pool_, FileProviderPtr provider, LoggerPtr log)
: path_pool_weak_ref(std::move(path_pool_))
, file_provider(std::move(provider))
, logger(std::move(log))
{}

void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids)
{
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `remover` after shutdowning or dropping the table,
// we must skip because the `valid_ids` is not reliable!
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
if (!path_pool || path_pool->isShutdown())
return;

SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove");
Expand Down Expand Up @@ -102,6 +139,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
LOG_INFO(logger,
"GC try remove useless DM file, but file not found and may have been removed, dmfile={}",
DMFile::getPathByStatus(path, id, DMFile::Status::READABLE));
continue; // next file
}
else if (dmfile->canGC())
{
Expand All @@ -126,19 +164,30 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
LOG_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path());
else
LOG_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={} err_msg={}", dmfile->path(), err_msg);
continue; // next file
}
}
}
};
}
};

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider());
callbacks.remover = LocalDMFileGcRemover(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider(), log);
// remember to unregister it when shutdown
storage_pool->dataRegisterExternalPagesCallbacks(callbacks);
storage_pool->enableGC();
storage_pool->startup(std::move(callbacks));

background_task_handle = background_pool.addTask([this] { return handleBackgroundTask(false); });

blockable_background_pool_handle = blockable_background_pool.addTask([this] { return handleBackgroundTask(true); });

// Do place delta index.
// Generate place delta index tasks
for (auto & [end, segment] : segments)
{
(void)end;
Expand Down
77 changes: 42 additions & 35 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa

GlobalStoragePool::~GlobalStoragePool()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
shutdown();
}

void GlobalStoragePool::restore()
Expand All @@ -132,6 +128,15 @@ void GlobalStoragePool::restore()
false);
}

void GlobalStoragePool::shutdown()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = {};
}
}

FileUsageStatistics GlobalStoragePool::getLogFileUsage() const
{
return log_storage->getFileUsageStatistics();
Expand Down Expand Up @@ -535,50 +540,68 @@ StoragePool::~StoragePool()
shutdown();
}

void StoragePool::enableGC()
{
// The data in V3 will be GCed by `GlobalStoragePool::gc`, only register gc task under only v2/mix mode
if (run_mode == PageStorageRunMode::ONLY_V2 || run_mode == PageStorageRunMode::MIX_MODE)
{
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
}
}

void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks)
void StoragePool::startup(ExternalPageCallbacks && callbacks)
{
switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
// For V2, we need a per physical table gc handle to perform the gc of its PageStorage instances.
data_storage_v2->registerExternalPagesCallbacks(callbacks);
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
break;
}
case PageStorageRunMode::ONLY_V3:
{
// For V3, the GC is handled by `GlobalStoragePool::gc`, just register callbacks is OK.
data_storage_v3->registerExternalPagesCallbacks(callbacks);
break;
}
case PageStorageRunMode::MIX_MODE:
{
// We have transformed all pages from V2 to V3 in `restore`, so
// only need to register callbacks for V3.
// For V3, the GC is handled by `GlobalStoragePool::gc`.
// Since we have transformed all external pages from V2 to V3 in `StoragePool::restore`,
// just register callbacks to V3 is OK
data_storage_v3->registerExternalPagesCallbacks(callbacks);
// we still need a gc_handle to reclaim the V2 disk space.
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
break;
}
default:
throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast<UInt8>(run_mode)), ErrorCodes::LOGICAL_ERROR);
}
}

void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id)
void StoragePool::shutdown()
{
// Note: Should reset the gc_handle before unregistering the pages callbacks
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}

switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
data_storage_v2->unregisterExternalPagesCallbacks(ns_id);
break;
}
case PageStorageRunMode::ONLY_V3:
{
data_storage_v3->unregisterExternalPagesCallbacks(ns_id);
break;
}
case PageStorageRunMode::MIX_MODE:
{
// We have transformed all pages from V2 to V3 in `restore`, so
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
// We have transformed all external pages from V2 to V3 in `restore`, so
// only need to unregister callbacks for V3.
data_storage_v3->unregisterExternalPagesCallbacks(ns_id);
break;
Expand All @@ -588,7 +611,6 @@ void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id)
}
}


bool StoragePool::doV2Gc(const Settings & settings)
{
bool done_anything = false;
Expand Down Expand Up @@ -629,21 +651,6 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)
return doV2Gc(settings);
}

void StoragePool::shutdown()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
if (run_mode != PageStorageRunMode::ONLY_V3)
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
}
}

void StoragePool::drop()
{
shutdown();
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Poco/Logger.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageStorage.h>
Expand All @@ -36,7 +35,7 @@ namespace DM
class StoragePool;
using StoragePoolPtr = std::shared_ptr<StoragePool>;

static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60);
static constexpr std::chrono::seconds DELTA_MERGE_GC_PERIOD(60);

class GlobalStoragePool : private boost::noncopyable
{
Expand All @@ -51,6 +50,8 @@ class GlobalStoragePool : private boost::noncopyable

void restore();

void shutdown();

friend class StoragePool;
friend class ::DB::AsynchronousMetrics;

Expand Down Expand Up @@ -90,7 +91,7 @@ class StoragePool : private boost::noncopyable

NamespaceId getNamespaceId() const { return ns_id; }

PageStorageRunMode getPageStorageRunMode()
PageStorageRunMode getPageStorageRunMode() const
{
return run_mode;
}
Expand Down Expand Up @@ -141,16 +142,15 @@ class StoragePool : private boost::noncopyable
PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id);
PageReader newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot);

void enableGC();

void dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks);
// Register the clean up DMFiles callbacks to PageStorage.
// The callbacks will be unregister when `shutdown` is called.
void startup(ExternalPageCallbacks && callbacks);

void dataUnregisterExternalPagesCallbacks(NamespaceId ns_id);
// Shutdown the gc handle and DMFile callbacks
void shutdown();

bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

void shutdown();

// Caller must cancel gc tasks before drop
void drop();

Expand Down
Loading