Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix potential data lost of alter_partition_by (release-7.5) #8350

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 97 additions & 32 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,14 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
}
else
{
/// The new non-partitioned table will have a new id
// Create the new table.
// If the new table is a partition table, this will also overwrite
// the partition id mapping to the new logical table and renew the
// partition info.
applyPartitionAlter(diff.schema_id, diff.table_id);
// Drop the old table. if the previous partitions of the old table are
// not mapping to the old logical table now, they will not be removed.
applyDropTable(diff.schema_id, diff.old_table_id);
applyCreateTable(diff.schema_id, diff.table_id);
}
break;
}
Expand Down Expand Up @@ -373,6 +378,51 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
}
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyPartitionAlter(DatabaseID database_id, TableID table_id)
{
auto table_info = getter.getTableInfo(database_id, table_id);
if (table_info == nullptr) // the database maybe dropped
{
LOG_DEBUG(log, "table is not exist in TiKV, may have been dropped, table_id={}", table_id);
return;
}

table_id_map.emplaceTableID(table_id, database_id);
LOG_DEBUG(log, "register table to table_id_map, database_id={} table_id={}", database_id, table_id);

if (!table_info->isLogicalPartitionTable())
{
return;
}

// If table is partition table, we will create the logical table here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If table is partition table, we will create the logical table here.
// If table is partition table, we will create the physical table here.

typo?

Copy link
Contributor Author

@JaySon-Huang JaySon-Huang Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The applyCreatePhysicalTable only create the IStorage for logical table. The function name is kind of misleading.

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyCreatePhysicalTable(
const TiDB::DBInfoPtr & db_info,
const TableInfoPtr & table_info)
{
GET_METRIC(tiflash_schema_internal_ddl_count, type_create_table).Increment();
LOG_INFO(
log,
"Creating table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
/// Check if this is a RECOVER table.
{
auto & tmt_context = context.getTMTContext();
if (auto * storage = tmt_context.getStorages().get(keyspace_id, table_info->id).get(); storage)
{
if (!storage->isTombstone())
{
LOG_DEBUG(
log,
"Trying to create table {}, but it already exists and is not marked as tombstone, database_id={} "
"table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
return;
}
LOG_DEBUG(
log,
"Recovering table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
AlterCommands commands;
{
AlterCommand command;
command.type = AlterCommand::RECOVER;
commands.emplace_back(std::move(command));
}
auto alter_lock = storage->lockForAlter(getThreadNameAndID());
storage->updateTombstone(
alter_lock,
commands,
name_mapper.mapDatabaseName(*db_info),
*table_info,
name_mapper,
context);
LOG_INFO(
log,
"Created table {}, database_id={} table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id);
return;
}
}
/// Normal CREATE table.
if (table_info->engine_type == StorageEngine::UNSPECIFIED)
{
auto & tmt_context = context.getTMTContext();
table_info->engine_type = tmt_context.getEngineType();
}
String stmt = createTableStmt(*db_info, *table_info, name_mapper, log);
LOG_INFO(
log,
"Creating table {} (database_id={} table_id={}) with statement: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
stmt);
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, stmt.data(), stmt.data() + stmt.size(), "from syncSchema " + table_info->name, 0);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get

// Because we get the table_info, so we can ensure new_db_info will not be nullptr.
auto new_db_info = getter.getDatabase(database_id);
applyCreatePhysicalTable(new_db_info, table_info);

for (const auto & part_def : table_info->partition.definitions)
{
LOG_DEBUG(
log,
"register table to table_id_map for partition table, logical_table_id={} physical_table_id={}",
table_id,
part_def.id);
table_id_map.emplacePartitionTableID(part_def.id, table_id);
}

auto & tmt_context = context.getTMTContext();
auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id);
if (storage == nullptr)
{
LOG_ERROR(log, "table is not exist in TiFlash, table_id={}", table_id);
return;
}

// Try to renew the partition info for the new table
applyPartitionDiff(new_db_info, table_info, storage);
}

template <typename Getter, typename NameMapper>
void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(DatabaseID database_id, TableID table_id)
{
Expand Down Expand Up @@ -410,70 +460,70 @@ void SchemaBuilder<Getter, NameMapper>::applyPartitionDiff(
const TableInfoPtr & table_info,
const ManageableStoragePtr & storage)
{
const auto & orig_table_info = storage->getTableInfo();
if (!orig_table_info.isLogicalPartitionTable())
const auto & local_table_info = storage->getTableInfo();
// ALTER TABLE t PARTITION BY ... may turn a non-partition table into partition table
// with some partition ids in `partition.adding_definitions`/`partition.definitions`
// and `partition.dropping_definitions`. We need to create those partitions.
if (!local_table_info.isLogicalPartitionTable())
{
LOG_ERROR(
LOG_INFO(
log,
"old table in TiFlash not partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, orig_table_info),
"Altering non-partition table to be a partition table {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, local_table_info),
db_info->id,
orig_table_info.id);
return;
local_table_info.id);
}

const auto & orig_defs = orig_table_info.partition.definitions;
const auto & local_defs = local_table_info.partition.definitions;
const auto & new_defs = table_info->partition.definitions;

std::unordered_set<TableID> orig_part_id_set, new_part_id_set;
std::vector<String> orig_part_ids, new_part_ids;
std::for_each(orig_defs.begin(), orig_defs.end(), [&orig_part_id_set, &orig_part_ids](const auto & def) {
orig_part_id_set.emplace(def.id);
orig_part_ids.emplace_back(std::to_string(def.id));
std::unordered_set<TableID> local_part_id_set, new_part_id_set;
std::for_each(local_defs.begin(), local_defs.end(), [&local_part_id_set](const auto & def) {
local_part_id_set.emplace(def.id);
});
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set, &new_part_ids](const auto & def) {
std::for_each(new_defs.begin(), new_defs.end(), [&new_part_id_set](const auto & def) {
new_part_id_set.emplace(def.id);
new_part_ids.emplace_back(std::to_string(def.id));
});

auto orig_part_ids_str = boost::algorithm::join(orig_part_ids, ", ");
auto new_part_ids_str = boost::algorithm::join(new_part_ids, ", ");

LOG_INFO(
log,
"Applying partition changes {} with database_id={}, table_id={}, old: {}, new: {}",
name_mapper.debugCanonicalName(*db_info, *table_info),
db_info->id,
table_info->id,
orig_part_ids_str,
new_part_ids_str);
local_part_id_set,
new_part_id_set);
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

if (orig_part_id_set == new_part_id_set)
if (local_part_id_set == new_part_id_set)
{
LOG_INFO(
log,
"No partition changes {} with database_id={}, table_id={}",
"No partition changes, paritions_size={} {} with database_id={}, table_id={}",
name_mapper.debugCanonicalName(*db_info, *table_info),
new_part_id_set.size(),
db_info->id,
table_info->id);
return;
}

auto updated_table_info = orig_table_info;
// Copy the local table info and update fileds on the copy
auto updated_table_info = local_table_info;
updated_table_info.is_partition_table = true;
updated_table_info.belonging_table_id = table_info->belonging_table_id;
updated_table_info.partition = table_info->partition;

/// Apply changes to physical tables.
for (const auto & orig_def : orig_defs)
for (const auto & local_def : local_defs)
{
if (new_part_id_set.count(orig_def.id) == 0)
if (!new_part_id_set.contains(local_def.id))
{
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), orig_def.id);
applyDropPhysicalTable(name_mapper.mapDatabaseName(*db_info), local_def.id);
}
}

for (const auto & new_def : new_defs)
{
if (orig_part_id_set.count(new_def.id) == 0)
if (!local_part_id_set.contains(new_def.id))
{
table_id_map.emplacePartitionTableID(new_def.id, updated_table_info.id);
}
Expand Down Expand Up @@ -790,7 +840,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropSchema(const String & db_name)
auto db = context.tryGetDatabase(db_name);
if (db == nullptr)
{
LOG_INFO(log, "Database {} does not exists", db_name);
LOG_INFO(log, "Database does not exist, db_name={}", db_name);
return;
}

Expand Down Expand Up @@ -992,7 +1042,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDropPhysicalTable(const String & db
auto storage = tmt_context.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
GET_METRIC(tiflash_schema_internal_ddl_count, type_drop_table).Increment();
Expand Down Expand Up @@ -1033,14 +1083,29 @@ void SchemaBuilder<Getter, NameMapper>::applyDropTable(DatabaseID database_id, T
auto * storage = tmt_context.getStorages().get(keyspace_id, table_id).get();
if (storage == nullptr)
{
LOG_DEBUG(log, "table {} does not exist.", table_id);
LOG_DEBUG(log, "table does not exist, table_id={}", table_id);
return;
}
const auto & table_info = storage->getTableInfo();
if (table_info.isLogicalPartitionTable())
{
for (const auto & part_def : table_info.partition.definitions)
{
if (TableID latest_logical_table_id = table_id_map.findTableIDInPartitionMap(part_def.id);
latest_logical_table_id == -1 || latest_logical_table_id != table_info.id)
{
// The partition is managed by another logical table now (caused by `alter table X partition by ...`),
// skip dropping this partition when dropping the old logical table
LOG_INFO(
log,
"The partition is not managed by current logical table, skip, partition_table_id={} "
"new_logical_table_id={} current_logical_table_id={}",
part_def.id,
latest_logical_table_id,
table_info.id);
continue;
}

applyDropPhysicalTable(name_mapper.mapDatabaseName(database_id, keyspace_id), part_def.id);
}
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ struct SchemaBuilder
/// Parameter schema_name should be mapped.
void applyDropPhysicalTable(const String & db_name, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);
void applyPartitionAlter(DatabaseID database_id, TableID table_id);

void applyPartitionDiff(DatabaseID database_id, TableID table_id);
void applyPartitionDiff(
const TiDB::DBInfoPtr & db_info,
const TiDB::TableInfoPtr & table_info,
Expand Down
24 changes: 13 additions & 11 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ std::optional<SchemaDiff> SchemaGetter::getSchemaDiff(Int64 ver)
String data = TxnStructure::get(snap, key);
if (data.empty())
{
LOG_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key);
LOG_WARNING(log, "The schema diff is empty, schema_version={} key={}", ver, key);
return std::nullopt;
}
LOG_TRACE(log, "Get SchemaDiff from TiKV, schema_version={} data={}", ver, data);
SchemaDiff diff;
diff.deserialize(data);
return diff;
Expand All @@ -274,7 +275,7 @@ TiDB::DBInfoPtr SchemaGetter::getDatabase(DatabaseID db_id)
if (json.empty())
return nullptr;

LOG_DEBUG(log, "Get DB Info from TiKV : " + json);
LOG_DEBUG(log, "Get DB Info from TiKV: {}", json);
auto db_info = std::make_shared<TiDB::DBInfo>(json, keyspace_id);
return db_info;
}
Expand All @@ -284,25 +285,26 @@ TiDB::TableInfoPtr SchemaGetter::getTableInfo(DatabaseID db_id, TableID table_id
String db_key = getDBKey(db_id);
if (!checkDBExists(db_key))
{
LOG_ERROR(log, "The database {} does not exist.", db_id);
LOG_ERROR(log, "The database does not exist, database_id={}", db_id);
return nullptr;
}
String table_key = getTableKey(table_id);
String table_info_json = TxnStructure::hGet(snap, db_key, table_key);
if (table_info_json.empty())
{
LOG_WARNING(log, "The table {} is dropped in TiKV, try to get the latest table_info", table_id);
LOG_WARNING(log, "The table is dropped in TiKV, try to get the latest table_info, table_id={}", table_id);
table_info_json = TxnStructure::mvccGet(snap, db_key, table_key);
if (table_info_json.empty())
{
LOG_ERROR(
log,
"The table {} is dropped in TiKV, and the latest table_info is still empty, it should by gc",
"The table is dropped in TiKV, and the latest table_info is still empty, it should be GCed, "
"table_id={}",
table_id);
return nullptr;
}
}
LOG_DEBUG(log, "Get Table Info from TiKV : " + table_info_json);
LOG_DEBUG(log, "Get Table Info from TiKV: {}", table_info_json);
TiDB::TableInfoPtr table_info = std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id);

return table_info;
Expand All @@ -318,26 +320,26 @@ std::tuple<TiDB::DBInfoPtr, TiDB::TableInfoPtr> SchemaGetter::getDatabaseAndTabl
if (db_json.empty())
return std::make_tuple(nullptr, nullptr);

LOG_DEBUG(log, "Get DB Info from TiKV : " + db_json);
LOG_DEBUG(log, "Get DB Info from TiKV: {}", db_json);
auto db_info = std::make_shared<TiDB::DBInfo>(db_json, keyspace_id);

String table_key = getTableKey(table_id);
String table_info_json = TxnStructure::hGet(snap, db_key, table_key);
if (table_info_json.empty())
{
LOG_WARNING(log, "The table {} is dropped in TiKV, try to get the latest table_info", table_id);
LOG_WARNING(log, "The table is dropped in TiKV, try to get the latest table_info, table_id={}", table_id);
table_info_json = TxnStructure::mvccGet(snap, db_key, table_key);
if (table_info_json.empty())
{
LOG_ERROR(
log,
"The table {} is dropped in TiKV, and the latest table_info is still empty, it should by gc",
"The table is dropped in TiKV, and the latest table_info is still empty, it should be GCed, "
"table_id={}",
table_id);
return std::make_tuple(db_info, nullptr);
;
}
}
LOG_DEBUG(log, "Get Table Info from TiKV : " + table_info_json);
LOG_DEBUG(log, "Get Table Info from TiKV: {}", table_info_json);
TiDB::TableInfoPtr table_info = std::make_shared<TiDB::TableInfo>(table_info_json, keyspace_id);

return std::make_tuple(db_info, table_info);
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/TiDB/Schema/TiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ try
part_id_set.emplace(definition.id);
}

/// Treat `adding_definitions` and `dropping_definitions` as the normal `definitions`
/// in TiFlash. Because TiFlash need to create the physical IStorage instance
/// to handle the data on those partitions during DDL.

auto add_defs_json = json->getArray("adding_definitions");
if (!add_defs_json.isNull())
{
Expand Down Expand Up @@ -851,8 +855,6 @@ TableInfo::TableInfo(const String & table_info_json, KeyspaceID keyspace_id_)
String TableInfo::serialize() const
try
{
std::stringstream buf;

Poco::JSON::Object::Ptr json = new Poco::JSON::Object();
json->set("id", id);
json->set("keyspace_id", keyspace_id);
Expand All @@ -867,8 +869,8 @@ try
auto col_obj = col_info.getJSONObject();
cols_arr->add(col_obj);
}

json->set("cols", cols_arr);

Poco::JSON::Array::Ptr index_arr = new Poco::JSON::Array();
for (const auto & index_info : index_infos)
{
Expand Down Expand Up @@ -904,8 +906,8 @@ try

json->set("tiflash_replica", replica_info.getJSONObject());

std::stringstream buf;
json->stringify(buf);

return buf.str();
}
catch (const Poco::Exception & e)
Expand Down
Loading