Skip to content

Commit

Permalink
ddl: Fix frequent schema GC under multi tenant (pingcap#8248) (pingca…
Browse files Browse the repository at this point in the history
…p#113)

* ddl: Fix frequent schema GC under multi tenant (pingcap#8248)

close pingcap#8256

* ddl: Fix corner case when multiple region creates one table simultaneously (pingcap#8229)

close pingcap#8217

---------

Co-authored-by: hongyunyan <[email protected]>
  • Loading branch information
JaySon-Huang and hongyunyan authored Oct 27, 2023
1 parent f16e1b9 commit 38536da
Show file tree
Hide file tree
Showing 15 changed files with 324 additions and 156 deletions.
8 changes: 2 additions & 6 deletions dbms/src/Databases/DatabaseMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,9 @@ void DatabaseMemory::loadTables(
/// Nothing to load.
}

void DatabaseMemory::createTable(
const Context & /*context*/,
const String & table_name,
const StoragePtr & table,
const ASTPtr & /*query*/)
void DatabaseMemory::createTable(const Context & /*context*/, const String & /*table_name*/, const ASTPtr & /*query*/)
{
attachTable(table_name, table);
/// Nothing to do.
}

void DatabaseMemory::removeTable(const Context & /*context*/, const String & table_name)
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Databases/DatabaseMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class DatabaseMemory : public DatabaseWithOwnTablesBase

void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;

void createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
override;
void createTable(const Context & context, const String & table_name, const ASTPtr & query) override;

void removeTable(const Context & context, const String & table_name) override;

Expand Down
18 changes: 3 additions & 15 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,7 @@ void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, b
DatabaseLoading::cleanupTables(*this, name, tables_failed_to_startup, log);
}

void DatabaseOrdinary::createTable(
const Context & context,
const String & table_name,
const StoragePtr & table,
const ASTPtr & query)
void DatabaseOrdinary::createTable(const Context & context, const String & table_name, const ASTPtr & query)
{
const auto & settings = context.getSettingsRef();

Expand Down Expand Up @@ -234,15 +230,6 @@ void DatabaseOrdinary::createTable(

try
{
/// Add a table to the map of known tables.
{
std::lock_guard lock(mutex);
if (!tables.emplace(table_name, table).second)
throw Exception(
fmt::format("Table {}.{} already exists.", name, table_name),
ErrorCodes::TABLE_ALREADY_EXISTS);
}

context.getFileProvider()->renameFile(
table_metadata_tmp_path,
EncryptionPath(table_metadata_tmp_path, ""),
Expand Down Expand Up @@ -331,7 +318,8 @@ void DatabaseOrdinary::renameTable(

/// NOTE Non-atomic.
// Create new metadata and remove old metadata.
to_database_concrete->createTable(context, to_table_name, table, ast);
to_database_concrete->createTable(context, to_table_name, ast);
to_database_concrete->attachTable(to_table_name, table);
removeTable(context, table_name);
}

Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Databases/DatabaseOrdinary.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class DatabaseOrdinary : public DatabaseWithOwnTablesBase

void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;

void createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
override;
void createTable(const Context & context, const String & table_name, const ASTPtr & query) override;

void removeTable(const Context & context, const String & table_name) override;

Expand Down
15 changes: 1 addition & 14 deletions dbms/src/Databases/DatabaseTiFlash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,7 @@ void DatabaseTiFlash::loadTables(Context & context, ThreadPool * thread_pool, bo
}


void DatabaseTiFlash::createTable(
const Context & context,
const String & table_name,
const StoragePtr & table,
const ASTPtr & query)
void DatabaseTiFlash::createTable(const Context & context, const String & table_name, const ASTPtr & query)
{
const auto & settings = context.getSettingsRef();

Expand Down Expand Up @@ -259,15 +255,6 @@ void DatabaseTiFlash::createTable(

try
{
/// Add a table to the map of known tables.
{
std::lock_guard lock(mutex);
if (!tables.emplace(table_name, table).second)
throw Exception(
"Table " + name + "." + table_name + " already exists.",
ErrorCodes::TABLE_ALREADY_EXISTS);
}

/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
context.getFileProvider()->renameFile(
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Databases/DatabaseTiFlash.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class DatabaseTiFlash : public DatabaseWithOwnTablesBase

void loadTables(Context & context, ThreadPool * thread_pool, bool has_force_restore_data_flag) override;

void createTable(const Context & context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
override;
void createTable(const Context & context, const String & table_name, const ASTPtr & query) override;

void removeTable(const Context & context, const String & table_name) override;

Expand Down
11 changes: 3 additions & 8 deletions dbms/src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class IDatabaseIterator
virtual const String & name() const = 0;
virtual StoragePtr & table() const = 0;

virtual ~IDatabaseIterator() {}
virtual ~IDatabaseIterator() = default;
};

using DatabaseIteratorPtr = std::unique_ptr<IDatabaseIterator>;
Expand Down Expand Up @@ -88,12 +88,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>
virtual bool empty(const Context & context) const = 0;

/// Add the table to the database. Record its presence in the metadata.
virtual void createTable(
const Context & context,
const String & name,
const StoragePtr & table,
const ASTPtr & query)
= 0;
virtual void createTable(const Context & context, const String & name, const ASTPtr & query) = 0;

/// Delete the table from the database and return it. Delete the metadata.
virtual void removeTable(const Context & context, const String & name) = 0;
Expand Down Expand Up @@ -154,7 +149,7 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>
/// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any.
virtual void drop(const Context & context) = 0;

virtual ~IDatabase() {}
virtual ~IDatabase() = default;
};

using DatabasePtr = std::shared_ptr<IDatabase>;
Expand Down
47 changes: 29 additions & 18 deletions dbms/src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,31 +576,31 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
// Thus, we choose to do a retry here to wait the table created completed.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE)
{
auto log = Logger::get(fmt::format("InterpreterCreateQuery {} {}", database_name, table_name));
LOG_WARNING(
Logger::get("InterpreterCreateQuery"),
log,
"createTable failed with error code is {}, error info is {}, stack_info is {}",
e.code(),
e.displayText(),
e.getStackTrace().toString());
for (int i = 0; i < 20; i++) // retry for 400ms
const size_t max_retry = 50;
const int wait_useconds = 20000;
for (size_t i = 0; i < max_retry; i++) // retry
{
if (context.isTableExist(database_name, table_name))
{
return {};
}
else
{
const int wait_useconds = 20000;
LOG_ERROR(
Logger::get("InterpreterCreateQuery"),
"createTable failed but table not exist now, \nWe will sleep for {} ms and try again.",
wait_useconds / 1000);
usleep(wait_useconds); // sleep 20ms
}

// sleep a while and retry
LOG_ERROR(
log,
"createTable failed but table not exist now, \nWe will sleep for {} ms and try again.",
wait_useconds / 1000);
usleep(wait_useconds); // sleep 20ms
}
LOG_ERROR(
Logger::get("InterpreterCreateQuery"),
"still failed to createTable in InterpreterCreateQuery for retry 20 times");
log,
"still failed to createTable in InterpreterCreateQuery for retry {} times",
max_retry);
e.rethrow();
}
else
Expand All @@ -624,13 +624,24 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
create.attach,
false);

// start up before adding to `database`, or the storage can not be retrieved from `ManagedStorages::get`
res->startup();
// When creating tables, we should strictly follow the following steps:
// 1. Create a .sql file
// 2. Register the instance to ManagedStorages
// 3. Register the instance to IDatabase
// Once the instance is registered in `ManagedStorages`, we will try to apply DDL alter changes to its .sql files
// If we do step 2 before step 1, we may run into "can't find .sql file" error when applying DDL jobs.
// Besides, we make step 3 the final one, to ensure once we pass the check of context.isTableExist(database_name, table_name)`, the table must be created completely.

if (create.is_temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
database->createTable(context, table_name, query_ptr);

// register the storage instance into `ManagedStorages`
res->startup();

if (!create.is_temporary)
database->attachTable(table_name, res);
}

/// If the query is a CREATE SELECT, insert the data into the table.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ class KVStore final : private boost::noncopyable

/// Create `runner_cnt` threads to run ReadIndexWorker asynchronously and automatically.
/// If there is other runtime framework, DO NOT invoke it.
void asyncRunReadIndexWorkers();
void asyncRunReadIndexWorkers() const;

/// Stop workers after there is no more read-index task.
void stopReadIndexWorkers();
void stopReadIndexWorkers() const;

/// TODO: if supported by runtime framework, run one round for specific runner by `id`.
void runOneRoundOfReadIndexRunner(size_t runner_id);
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ ReadIndexWorkerManager::ReadIndexWorkerManager(
ReadIndexWorkerManager::FnGetTickTime && fn_min_dur_handle_region,
size_t runner_cnt)
: proxy_helper(proxy_helper_)
, logger(&Poco::Logger::get("ReadIndexWorkers"))
, logger(Logger::get("ReadIndexWorkers"))
{
for (size_t i = 0; i < runner_cnt; ++i)
runners.emplace_back(std::make_unique<ReadIndexRunner>(
Expand Down Expand Up @@ -931,7 +931,7 @@ void KVStore::initReadIndexWorkers(
read_index_worker_manager = ptr;
}

void KVStore::asyncRunReadIndexWorkers()
void KVStore::asyncRunReadIndexWorkers() const
{
if (!read_index_worker_manager)
return;
Expand All @@ -940,13 +940,12 @@ void KVStore::asyncRunReadIndexWorkers()
read_index_worker_manager->asyncRun();
}

void KVStore::stopReadIndexWorkers()
void KVStore::stopReadIndexWorkers() const
{
if (!read_index_worker_manager)
return;

assert(this->proxy_helper);

read_index_worker_manager->stop();
}

Expand Down Expand Up @@ -1012,13 +1011,13 @@ ReadIndexWorkerManager::ReadIndexRunner::ReadIndexRunner(
size_t id_,
size_t runner_cnt_,
ReadIndexWorkers & workers_,
Poco::Logger * logger_,
LoggerPtr logger_,
FnGetTickTime fn_min_dur_handle_region_,
AsyncWaker::NotifierPtr global_notifier_)
: id(id_)
, runner_cnt(runner_cnt_)
, workers(workers_)
, logger(logger_)
, logger(std::move(logger_))
, fn_min_dur_handle_region(std::move(fn_min_dur_handle_region_))
, global_notifier(std::move(global_notifier_))
{}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/KVStore/Read/ReadIndexWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ class ReadIndexWorkerManager : boost::noncopyable
size_t id_,
size_t runner_cnt_,
ReadIndexWorkers & workers_,
Poco::Logger * logger_,
LoggerPtr logger_,
FnGetTickTime fn_min_dur_handle_region_,
AsyncWaker::NotifierPtr global_notifier_);

const size_t id;
const size_t runner_cnt;
ReadIndexWorkers & workers;
Poco::Logger * logger;
LoggerPtr logger;
const FnGetTickTime fn_min_dur_handle_region;
/// The workers belonged to runner share same notifier.
AsyncWaker::NotifierPtr global_notifier;
Expand All @@ -157,7 +157,7 @@ class ReadIndexWorkerManager : boost::noncopyable
std::vector<std::unique_ptr<ReadIndexRunner>> runners;
/// Each worker controls read-index process of region(region_id % worker_cnt == worker_id).
ReadIndexWorkers workers;
Poco::Logger * logger;
LoggerPtr logger;
};

struct ReadIndexNotifyCtrl;
Expand Down Expand Up @@ -359,4 +359,4 @@ struct MockStressTestCfg
static bool enable;
};

} // namespace DB
} // namespace DB
Loading

0 comments on commit 38536da

Please sign in to comment.