Skip to content

Commit

Permalink
ddl: Fix the physically drop storage instance may block removing regi…
Browse files Browse the repository at this point in the history
…ons (#9442)

close #9437

ddl: Fix the physical drop storage instance may block removing regions
Make sure physical drop storage instance only happen after all related regions are removed
  • Loading branch information
JaySon-Huang authored Sep 20, 2024
1 parent e561201 commit 62809fe
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 38 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ namespace DB
M(skip_check_segment_update) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(force_set_num_regions_for_table) \
M(minimum_block_size_for_cross_join) \
M(random_exception_after_dt_write_done) \
M(random_slow_page_storage_write) \
Expand Down
72 changes: 61 additions & 11 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,47 @@ MockTiDB::MockTiDB()
databases["default"] = 0;
}

TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions)
TablePtr MockTiDB::dropTableByNameImpl(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
String qualified_name = database_name + "." + table_name;
auto it_by_name = tables_by_name.find(qualified_name);
if (it_by_name == tables_by_name.end())
return nullptr;
auto table = it_by_name->second;
dropTableInternal(context, table, drop_regions);

tables_by_name.erase(it_by_name);
return table;
}

TablePtr MockTiDB::dropTableByIdImpl(Context & context, const TableID table_id, bool drop_regions)
{
auto iter = tables_by_id.find(table_id);
if (iter == tables_by_id.end())
return nullptr;

auto table = iter->second;
dropTableInternal(context, table, drop_regions);

// erase from `tables_by_name`
for (auto iter_by_name = tables_by_name.begin(); iter_by_name != tables_by_name.end(); /* empty */)
{
if (table != iter_by_name->second)
{
++iter_by_name;
continue;
}
LOG_INFO(Logger::get(), "removing table from MockTiDB, name={} table_id={}", iter_by_name->first, table_id);
iter_by_name = tables_by_name.erase(iter_by_name);
}
return table;
}

TablePtr MockTiDB::dropTableInternal(Context & context, const TablePtr & table, bool drop_regions)
{
auto & kvstore = context.getTMTContext().getKVStore();
auto & region_table = context.getTMTContext().getRegionTable();

auto table = it_by_name->second;
if (table->isPartitionTable())
{
for (const auto & partition : table->table_info.partition.definitions)
Expand All @@ -90,15 +120,12 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const String & database_
}
tables_by_id.erase(table->id());

tables_by_name.erase(it_by_name);

if (drop_regions)
{
for (auto & e : region_table.getRegionsByTable(NullspaceID, table->id()))
kvstore->mockRemoveRegion(e.first, region_table);
region_table.removeTable(NullspaceID, table->id());
}

return table;
}

Expand All @@ -113,7 +140,7 @@ void MockTiDB::dropDB(Context & context, const String & database_name, bool drop
});

for (const auto & table_name : table_names)
dropTableInternal(context, database_name, table_name, drop_regions);
dropTableByNameImpl(context, database_name, table_name, drop_regions);

version++;

Expand All @@ -132,8 +159,25 @@ void MockTiDB::dropDB(Context & context, const String & database_name, bool drop
void MockTiDB::dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);
auto table = dropTableByNameImpl(context, database_name, table_name, drop_regions);
if (!table)
return;

version++;

auto table = dropTableInternal(context, database_name, table_name, drop_regions);
SchemaDiff diff;
diff.type = SchemaActionType::DropTable;
diff.schema_id = table->database_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

void MockTiDB::dropTableById(Context & context, const TableID & table_id, bool drop_regions)
{
std::lock_guard lock(tables_mutex);

auto table = dropTableByIdImpl(context, table_id, drop_regions);
if (!table)
return;

Expand Down Expand Up @@ -273,13 +317,15 @@ TableID MockTiDB::newTable(
return addTable(database_name, std::move(*table_info));
}

int MockTiDB::newTables(
std::vector<TableID> MockTiDB::newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);
std::vector<TableID> table_ids;
table_ids.reserve(tables.size());
if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
Expand All @@ -300,7 +346,8 @@ int MockTiDB::newTables(
table_info.id = table_id_allocator++;
table_info.update_timestamp = tso;

auto table = std::make_shared<Table>(database_name, databases[database_name], table_info.name, std::move(table_info));
auto table
= std::make_shared<Table>(database_name, databases[database_name], table_info.name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
tables_by_name.emplace(qualified_name, table);

Expand All @@ -310,6 +357,8 @@ int MockTiDB::newTables(
opt.old_schema_id = table->database_id;
opt.old_table_id = table->id();
diff.affected_opts.push_back(std::move(opt));

table_ids.push_back(table->id());
}

if (diff.affected_opts.empty())
Expand All @@ -318,7 +367,8 @@ int MockTiDB::newTables(
diff.schema_id = diff.affected_opts[0].schema_id;
diff.version = version;
version_diff[version] = diff;
return 0;

return table_ids;
}

TableID MockTiDB::addTable(const String & database_name, TiDB::TableInfo && table_info)
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
#include <TiDB/Schema/SchemaGetter.h>
Expand Down Expand Up @@ -82,7 +83,7 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & handle_pk_name,
const String & engine_type);

int newTables(
std::vector<TableID> newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
Expand All @@ -104,6 +105,7 @@ class MockTiDB : public ext::Singleton<MockTiDB>
void dropPartition(const String & database_name, const String & table_name, TableID partition_id);

void dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions);
void dropTableById(Context & context, const TableID & table_id, bool drop_regions);

void dropDB(Context & context, const String & database_name, bool drop_regions);

Expand Down Expand Up @@ -151,7 +153,9 @@ class MockTiDB : public ext::Singleton<MockTiDB>

private:
TableID newPartitionImpl(const TablePtr & logical_table, TableID partition_id, const String & partition_name, Timestamp tso, bool is_add_part);
TablePtr dropTableInternal(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr dropTableByNameImpl(Context & context, const String & database_name, const String & table_name, bool drop_regions);
TablePtr dropTableByIdImpl(Context & context, TableID table_id, bool drop_regions);
TablePtr dropTableInternal(Context & context, const TablePtr & table, bool drop_regions);
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);
TablePtr getTableByID(TableID table_id);

Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,20 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer

// Trigger gc on all databases / tables.
// Usage:
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])"
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point, ignore_remain_regions])"
void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
auto & service = context.getSchemaSyncService();
Timestamp gc_safe_point = 0;
bool ignore_remain_regions = false;
if (args.empty())
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
if (!args.empty())
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
service->gc(gc_safe_point, NullspaceID);
if (args.size() >= 2)
ignore_remain_regions = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[1]).value) == "true";
// Note that only call it in tests, we need to ignore remain regions
service->gcImpl(gc_safe_point, NullspaceID, ignore_remain_regions);

output("schemas gc done");
}
Expand Down Expand Up @@ -139,4 +143,4 @@ void dbgFuncIsTombstone(Context & context, const ASTs & args, DBGInvoker::Printe
}


} // namespace DB
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Pri

// Trigger gc on all databases / tables.
// Usage:
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])"
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point, ignore_remain_regions])"
void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Reset schemas.
Expand Down
36 changes: 35 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/setThreadName.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ExternalDTFileInfo.h>
Expand All @@ -26,7 +27,11 @@
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiKVRange.h>
#include <Storages/Transaction/Types.h>
#include <TiDB/Schema/SchemaSyncer.h>
#include <fiu.h>

#include <any>

namespace DB
{
Expand All @@ -37,6 +42,10 @@ extern const int UNKNOWN_TABLE;
extern const int ILLFORMAT_RAFT_ROW;
extern const int TABLE_IS_DROPPED;
} // namespace ErrorCodes
namespace FailPoints
{
extern const char force_set_num_regions_for_table[];
} // namespace FailPoints

RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id)
{
Expand Down Expand Up @@ -285,8 +294,8 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
{
tables.erase(ks_tb_id);
}
LOG_INFO(log, "remove [region {}] in RegionTable done", region_id);
}
LOG_INFO(log, "remove [region {}] in RegionTable done", region_id);

// Sometime we don't need to remove data. e.g. remove region after region merge.
if (remove_data)
Expand Down Expand Up @@ -431,6 +440,31 @@ void RegionTable::handleInternalRegionsByTable(const KeyspaceID keyspace_id, con
}
}

std::vector<RegionID> RegionTable::getRegionIdsByTable(KeyspaceID keyspace_id, TableID table_id) const
{
fiu_do_on(FailPoints::force_set_num_regions_for_table, {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::force_set_num_regions_for_table); v)
{
auto num_regions = std::any_cast<std::vector<RegionID>>(v.value());
return num_regions;
}
});

std::lock_guard lock(mutex);
if (auto iter = tables.find(KeyspaceTableID{keyspace_id, table_id}); //
unlikely(iter != tables.end()))
{
std::vector<RegionID> ret_regions;
ret_regions.reserve(iter->second.regions.size());
for (const auto & r : iter->second.regions)
{
ret_regions.emplace_back(r.first);
}
return ret_regions;
}
return {};
}

std::vector<std::pair<RegionID, RegionPtr>> RegionTable::getRegionsByTable(const KeyspaceID keyspace_id, const TableID table_id) const
{
auto & kvstore = context->getTMTContext().getKVStore();
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class RegionTable : private boost::noncopyable
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist);

void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function<void(const InternalRegions &)> && callback) const;

std::vector<RegionID> getRegionIdsByTable(KeyspaceID keyspace_id, TableID table_id) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const;

/// Write the data of the given region into the table with the given table ID, fill the data list for outer to remove.
Expand Down
Loading

0 comments on commit 62809fe

Please sign in to comment.